Client Libraries
Supported client APIs:
tip
Contact your admin for access credentials to artifacts repositories.
Client language | Supported versions | Supported OS | Artifacts Repository |
---|---|---|---|
Java | 11-14 | macOS, Windows, Linux | Link |
Python | 3.6, 3.7, 3.8, 3.9, 3.10 | macOS, Windows, Linux | Link |
.NET | netstandard2.0 | macOS, Windows, Linux | Link |
C++ | - | macOS, Windows, Linux | Link |
You can install TimeBase Python client using this command
# install specific version
pip install dxapi==5.4.26 --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
# install latest version
pip install dxapi --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
# command for centos7 (supported for 5.4.18 version only)
pip install dxapi==5.4.18+centos --extra-index-url https://<user>:<password>@nexus.deltixhub.com/repository/epm-rtc-public-python/simple
Prerequisites
Before running samples, please make sure you have a TimeBase Server running on your local machine on port 8011.
Sample Message
- Java
- .NET
public class MyBarMessage extends InstrumentMessage {
public double closePrice;
public double openPrice;
public double highPrice;
public double lowPrice;
public double volume;
public String exchange;
@Override
public StringBuilder toString(StringBuilder sb) {
sb.append("{ \"$type\": \"MyBarMessage\"");
sb.append(", \"closePrice\": ").append(closePrice);
sb.append(", \"openPrice\": ").append(openPrice);
sb.append(", \"highPrice\": ").append(highPrice);
sb.append(", \"lowPrice\": ").append(lowPrice);
sb.append(", \"volume\": ").append(volume);
return sb;
}
}
public class MyBarMessage : InstrumentMessage
{
public double ClosePrice;
public double OpenPrice;
public double HighPrice;
public double LowPrice;
public double Volume;
public String Exchange;
//
// Summary:
// Appends an object state to a given StringBuilder in a form of JSON. A StringBuilder
// that was used to append the object to.
public override StringBuilder ToString(StringBuilder builder)
{
builder.Append("{ \"$type\": \"MyBarMessage\"");
builder.Append(", \"closePrice\": ").Append(ClosePrice);
builder.Append(", \"openPrice\": ").Append(OpenPrice);
builder.Append(", \"highPrice\": ").Append(HighPrice);
builder.Append(", \"lowPrice\": ").Append(LowPrice);
builder.Append(", \"volume\": ").Append(Volume);
return builder;
}
}
Create Stream
- Java
- Python
- C++
- .NET
public void createStream() {
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
String streamName = "mybars";
// Introspect MyBarMessage.class to define it's schema
Introspector introspector = Introspector.createEmptyMessageIntrospector();
RecordClassDescriptor descriptor = introspector.introspectRecordClass(MyBarMessage.class);
// Define stream Options to creating Durable (Persistent storage) stream with maximum distribution with given descriptor
StreamOptions options = StreamOptions.fixedType(StreamScope.DURABLE, streamName, "Bar Messages", 0, descriptor);
// Create stream in database.
// first check if stream already exists in database, and delete it
DXTickStream bars = db.getStream(streamName);
if (bars != null)
bars.delete();
// create stream
DXTickStream stream = db.createStream(streamName, options);
System.out.println(stream.getKey() + " successfully created using Introspector");
}
}
import dxapi
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8102'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in write mode
db.open(False)
schema = db.generateSchema(['deltix.timebase.api.messages.BarMessage'])
options = dxapi.StreamOptions()
options.schema(schema)
stream = db.createStream('MyBarsStream2', options)
finally: # database connection should be closed anyway
if (db.isOpen()):
db.close()
print("Connection " + timebase + " closed.")
void create_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Prepare QQL create stream DDL
std::string qql = "CREATE DURABLE STREAM \"bars1min\" 'bars1min' ( \
CLASS \"deltix.timebase.api.messages.MarketMessage\" 'Market Message' ( \
STATIC \"originalTimestamp\" TIMESTAMP = NULL, \
STATIC \"currencyCode\" 'Currency Code' INTEGER = 999, \
STATIC \"sequenceNumber\" '' INTEGER = NULL, \
STATIC \"sourceId\" '' VARCHAR = NULL \
) NOT INSTANTIABLE; \
CLASS \"deltix.timebase.api.messages.BarMessage\" 'Bar Message' UNDER \"deltix.timebase.api.messages.MarketMessage\" ( \
\"exchangeId\" 'Exchange Code' VARCHAR, \
\"close\" 'Close' FLOAT DECIMAL, \
\"open\" 'Open' FLOAT DECIMAL RELATIVE TO \"close\", \
\"high\" 'High' FLOAT DECIMAL RELATIVE TO \"close\", \
\"low\" 'Low' FLOAT DECIMAL RELATIVE TO \"close\", \
\"volume\" 'Volume' FLOAT DECIMAL \
); \
) \
OPTIONS(FIXEDTYPE; PERIODICITY = '1I'; HIGHAVAILABILITY = TRUE) \
COMMENT 'bars1min'";
// Execute query
unique_ptr<TickCursor> cursor(db->executeQuery(qql, std::vector<DxApi::QueryParameter>()));
// Read result
InstrumentMessage header;
if (cursor->next(&header)) {
std::string details, errorType, text;
DataReader &reader = cursor->getReader();
reader.readUTF8(details);
reader.readUTF8(errorType);
int8_t errorLevel = reader.readEnum8();
reader.readUTF8(text);
std::cout << "Query result: " << errorType << " with text: " << text << std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
public void CreateStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
String streamName = "mybars";
// Create stream in database.
// Delete it, if its already exists...
ITickStream bars = db.GetStream(streamName);
if (bars != null)
bars.Delete();
StreamOptions streamOptions = new StreamOptions(StreamScope.Durable, streamName, "Bar Messages", 0);
RecordClassDescriptor descriptor = Introspector.CreateMessageIntrospector().IntrospectRecordClass(null); // MyBarMessage.class);
streamOptions.SetFixedType(descriptor);
// create stream
ITickStream stream = db.CreateStream(streamName, streamOptions);
Console.WriteLine(stream.Key + " successfully created using Introspector");
}
}
Write
- Java
- Python
- C++
- .NET
public void writeStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
//
// 3. Writing new messages to the stream
//
// 3.1 Create Loader
try (TickLoader loader = stream.createLoader()) {
// 3.2 Create message to send
MyBarMessage message = new MyBarMessage();
// 3.3 set time for this message
LocalDateTime localTime = LocalDateTime.of(2020, Month.MARCH, 9, 0, 0, 0);
Instant utc = localTime.atZone(ZoneId.of("UTC")).toInstant();
message.setTimeStampMs(utc.toEpochMilli());
// 3.4 define field values
message.setSymbol("AAPL"); // Apple
message.openPrice = 263.75;
message.highPrice = 278.09;
message.lowPrice = 263.00;
message.closePrice = 266.17;
message.volume = 71_690_000;
message.exchange = "NYSE";
// 3.5. send first message
System.out.println("Sending 1st message: " + message);
loader.send(message);
// 3.6. reuse message to send another
message.setSymbol("GOOG"); // Apple
message.openPrice = 1205.3;
message.highPrice = 1254.76;
message.lowPrice = 1200.00;
message.closePrice = 1215.56;
message.volume = 33_700_000;
// 3.7. send second message
System.out.println("Sending 2nd message: " + message);
loader.send(message);
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in read-write mode
db.open(False)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'bars1min'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create a Message Loader for the selected stream and provide loading options
loader = stream.createLoader(dxapi.LoadingOptions())
# Create Bar message
barMessage = dxapi.InstrumentMessage()
# Define message type name according to the Timebase schema type name
# For the polymorphic streams, each message should have defined typeName to distinct messages on Timebase Server level.
barMessage.typeName = 'deltix.timebase.api.messages.BarMessage'
print('Start loading to ' + streamKey)
# get current time in UTC
now = datetime.utcnow() - datetime(1970, 1, 1)
# Define message timestamp as Epoch time in nanoseconds
ns = now.total_seconds() * 1e9 + now.microseconds * 1000;
barMessage.instrumentType = 'EQUITY'
barMessage.symbol = 'AAPL'
barMessage.timestamp = ns;
barMessage.exchangeId = 'NYSE'
barMessage.open = 263.75
barMessage.high = 278.09
barMessage.low = 263.00
barMessage.close = 266.17
barMessage.volume = 71_690_000
# Send message
loader.send(barMessage)
barMessage.symbol = 'GOOG'
barMessage.timestamp = ns;
barMessage.exchangeId = 'NYSE'
barMessage.open = 1205.3
barMessage.high = 1254.76
barMessage.low = 1200.00
barMessage.close = 1215.56
barMessage.volume = 33_700_000
# Send message
loader.send(barMessage)
# close Message Loader
loader.close()
loader = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void write_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Create loader
unique_ptr<TickLoader> loader(stream->createLoader(LoadingOptions()));
DataWriter &writer = loader->getWriter();
unsigned __int64 timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
// send first message
auto instrument = loader->getInstrumentId(InstrumentIdentity(InstrumentType::EQUITY, "AAPL"));
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(266.17f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(263.75f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(278.09f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(263.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(71690000.0f).toUnderlying()); // volume
loader->send();
// send second message
instrument = loader->getInstrumentId(InstrumentIdentity(InstrumentType::EQUITY, "GOOG"));
loader->beginMessage(0, instrument, timestamp);
writer.writeUTF8(std::string("NYSE")); // exchange id
writer.writeUInt64(deltix::dfp::Decimal64(1215.56f).toUnderlying()); // close
writer.writeUInt64(deltix::dfp::Decimal64(1205.3f).toUnderlying()); // open
writer.writeUInt64(deltix::dfp::Decimal64(1254.76f).toUnderlying()); // high
writer.writeUInt64(deltix::dfp::Decimal64(1200.00f).toUnderlying()); // low
writer.writeUInt64(deltix::dfp::Decimal64(33700000.0f).toUnderlying()); // volume
loader->send();
// Close loader and connection
loader->close();
db->close();
}
public void WriteStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
// 2. Get already created stream from database
string streamName = "mybars";
ITickStream stream = db.GetStream(streamName);
// 3.1 Create Loader
using (ITickLoader loader = stream.CreateLoader())
{
// 3.2 Create message to send
MyBarMessage message = new MyBarMessage();
// 3.3 set time for this message
message.Timestamp = new HdDateTime(new DateTime(2020, 3, 9, 0, 0, 0));
// 3.4 define field values
message.Symbol = "AAPL"; // Apple
message.OpenPrice = 263.75;
message.HighPrice = 278.09;
message.LowPrice = 263.00;
message.ClosePrice = 266.17;
message.Volume = 71_690_000;
message.Exchange = "NYSE";
// 3.5. send first message
Console.WriteLine("Sending 1st message: " + message);
loader.Send(message);
// 3.6. reuse message to send another
message.Symbol = "GOOG"; // Apple
message.OpenPrice = 1205.3;
message.HighPrice = 1254.76;
message.LowPrice = 1200.00;
message.ClosePrice = 1215.56;
message.Volume = 33_700_000;
// 3.7. send second message
Console.WriteLine("Sending 2nd message: " + message);
loader.Send(message);
}
}
}
Read
- Java
- Python
- C++
- .NET
public void readStream() {
String connection = "dxtick://localhost:8011";
// 1. Create Timebase connection using connection string
try (DXTickDB db = TickDBFactory.createFromUrl(connection)) {
// It require to open database connection.
db.open(false);
// 2. Get already created stream from database
String streamName = "mybars";
DXTickStream stream = db.getStream(streamName);
// 3.1. Define List of entities to subscribe (if null, all stream entities will be used)
// Use GOOG
String[] entities = new String[] { "GOOG" };
// 3.2. Define list of types to subscribe - select only "MyBarMessage" messages
String typeName = MyBarMessage.class.getName();
String[] types = new String[] { typeName };
// 3.3. Create a new 'cursor' to read messages
try (TickCursor cursor = stream.select(Long.MIN_VALUE, new SelectionOptions(), types, entities)) {
// 3.4 Iterate cursor and get messages
while (cursor.next()) {
MyBarMessage message = (MyBarMessage) cursor.getMessage();
System.out.println("Read message: " + message.toString());
}
}
}
}
# Timebase URL specification, pattern is "dxtick://<host>:<port>"
timebase = 'dxtick://localhost:8011'
try:
# Create timebase connection
db = dxapi.TickDb.createFromUrl(timebase)
# Open in read-only mode
db.open(True)
print('Connected to ' + timebase)
# Define name of the stream
streamKey = 'ticks'
# Get stream from the timebase
stream = db.getStream(streamKey)
# Create cursor with empty subscription
cursor = db.createCursor(None, dxapi.SelectionOptions())
# Create subscription dynamically
# 1. Add entities
entities = [
dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAA'),
dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL')
]
cursor.addEntities(entities)
# 2. Subscribe to the Trade and BestBidOffer Messages
types = [
'deltix.timebase.api.messages.TradeMessage',
'deltix.timebase.api.messages.BestBidOfferMessage'
]
cursor.addTypes(types)
# 3. Subscribe to the data stream(s)
cursor.addStreams([stream])
# Define subscription start time
time = datetime(2010, 1, 1, 0, 0)
# Start time is Epoch time in milliseconds
startTime = calendar.timegm(time.timetuple()) * 1000
# 4. Reset cursor to the subscription time
cursor.reset(startTime)
try:
while cursor.next():
message = cursor.getMessage()
# Message time is Epoch time in nanoseconds
time = message.timestamp/1e9
messageTime = datetime.utcfromtimestamp(time)
if message.typeName == 'deltix.timebase.api.messages.TradeMessage':
print("Trade (" + str(messageTime) + "," + message.symbol + ") price: " + str(message.price))
elif message.typeName == 'deltix.timebase.api.messages.BestBidOfferMessage':
print("BBO (" + str(messageTime) + "," + message.symbol + ") bid price: " + str(message.bidPrice) + ", ask price: " + str(message.offerPrice))
finally:
# cursor should be closed anyway
cursor.close()
cursor = None
finally:
# database connection should be closed anyway
if db.isOpen():
db.close()
print("Connection " + timebase + " closed.")
void read_stream() {
// Connect to TimeBase
unique_ptr<TickDb> db(TickDb::createFromUrl("dxtick://localhost:8011"));
db->open(false);
// Get stream
TickStream *stream = db->getStream("bars1min");
// Specify list of identities to filter
std::vector<InstrumentIdentity> identities = { InstrumentIdentity(InstrumentType::EQUITY, "AAPL") };
// Specify list of types to filter
std::vector<std::string> types = { "deltix.timebase.api.messages.BarMessage" };
// Create cursor
unique_ptr<TickCursor> cursor(stream->select(0, SelectionOptions(), &types, &identities));
InstrumentMessage header;
while (cursor->next(&header)) {
DataReader &reader = cursor->getReader();
int64_t timestamp = header.timestamp;
std::string symbol, exchange;
double open, close, high, low, volume;
symbol = *cursor->getSymbolName(header.entityId);
reader.readUTF8(exchange);
close = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
open = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
high = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
low = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
volume = deltix::dfp::Decimal64::fromUnderlying(reader.readInt64()).toFloat64();
std::cout << "Read message: " <<
" symbol: " << symbol <<
" exchange: " << exchange <<
", open: " << open << ", close: " << close <<
", high: " << high << ", low: " << low <<
", volume: " << volume <<
std::endl;
}
// Close cursor and connection
cursor->close();
db->close();
}
public void ReadStream()
{
String connection = "dxtick://localhost:8011";
// Create Timebase connection using connection string
using (ITickDb db = TickDbFactory.CreateFromUrl(connection))
{
// It require to open connection using read-write mode.
db.Open(false);
// 2. Get already created stream from database
String streamName = "mybars";
ITickStream stream = db.GetStream(streamName);
// 3.1. Define List of entities to subscribe (if null, all stream entities will be used)
// Use GOOG
String[] entities = new String[] { "GOOG" };
IInstrumentIdentity[] ids = new IInstrumentIdentity[]
{
new ConstantInstrumentKey(InstrumentType.Equity, "GOOG")
};
// 3.2. Define list of types to subscribe - select only "MyBarMessage" messages
String[] types = new String[] { typeof(MyBarMessage).Name };
// 3.3. Create a new 'cursor' to read messages
using (ITickCursor cursor = stream.Select(DateTime.MinValue, new SelectionOptions(), types, ids))
{
// 3.4 Iterate cursor and get messages
while (cursor.Next())
{
MyBarMessage message = (MyBarMessage) cursor.GetMessage();
Console.WriteLine("Read message: " + message.ToString());
}
}
}
}