Skip to main content

Python API

InstrumentType

class InstrumentType(_object)

Represents instrument type.

Object of this class can be created from instrument type name (str), or int value:

Example:

fxType = dxapi.InstrumentType('FX')
indexType = dxapi.InstrumentType(dxapi.InstrumentType.INDEX)

Possible values:

EQUITY
OPTION
FUTURE
BOND
FX
INDEX
ETF
CUSTOM
SIMPLE_OPTION
EXCHANGE
TRADING_SESSION
STREAM
DATA_CONNECTOR
EXCHANGE_TRADED_SYNTHETIC
SYSTEM
CFD
UNKNOWN

InstrumentIdentity

class InstrumentIdentity(_object)

Represents instrument, contains from instrument type and instrument symbol.

Example:

appleId = dxapi.InstrumentIdentity(dxapi.InstrumentType('EQUITY'), 'AAPL')
btcUsdId = dxapi.InstrumentIdentity(dxapi.InstrumentType.FX, 'BTC/USD')

StreamScope

class StreamScope(_object)

Determines the scope of a stream's durability, if any.

Example:

scope = dxapi.StreamScope('TRANSIENT')

Possible values:

DURABLE,
EXTERNAL_FILE,
TRANSIENT,
RUNTIME

WriteMode

class WriteMode(_object)

APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time.

Example:

mode = dxapi.StreamScope('TRUNCATE')

Possible values:

APPEND,
REPLACE,
REWRITE,
TRUNCATE

SelectionOptions

class SelectionOptions(_object)

Options for selecting data from a stream.

Example:

so = dxapi.SelectionOptions()
so._from = 0
so.to = 100000
so.useCompression = False
...

LoadingOptions

class LoadingOptions(_object)

Options for loading data into a stream.

Example:

lo = dxapi.LoadingOptions()
lo.writeMode = dxapi.WriteMode('TRUNCATE')
so.space = 'myspace'
...

StreamOptions

class StreamOptions(_object)

Stream definition attributes.

Example:

so = dxapi.StreamOptions()
so.name = key
so.description = key
so.scope = dxapi.StreamScope('DURABLE')
so.distributionFactor = 1
so.highAvailability = False
so.polymorphic = False
so.metadata = schema

db.createStream(key, options)

QueryParameter

class QueryParameter(_object)

Input parameter definition for a prepared statement.

TickDb

class TickDb(_object)

The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl:

db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011')

or

db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')

createFromUrl

@staticmethod
def createFromUrl(url: str, user: str = None, password: str = None) -> "TickDb"

Creates a new database instance with the specified root folder, or URL.

Arguments:

  • url str - Connection URL.
  • user str - User.
  • password str - Password.

Returns:

  • TickDb - An un-opened TickDB instance.

openFromUrl

@staticmethod
@contextmanager
def openFromUrl(url: str, readonly: bool, user: str = None, password: str = None)

Creates a new database instance with the specified root folder, or URL, and opens it.

Arguments:

  • url str - Connection URL.
  • readonly bool - Open data store in read-only mode.
  • user str - User.
  • password str - Password.

Returns:

  • TickDb - An opened TickDB instance.

isReadOnly

def isReadOnly() -> bool

Determines whether the store is open as read-only.

isOpen

def isOpen() -> bool

Determines whether the store is open.

open

def open(readOnlyMode: bool) -> bool

Open the data store.

Arguments:

  • readOnlyMode bool - Open data store in read-only mode.

close

def close() -> None

Closes data store.

format

def format() -> bool

Create a new object on disk and format internally. The data store is left open for read-write at the end of this method.

listStreams

def listStreams() -> 'list[TickStream]'

Enumerates existing streams.

Returns:

  • list[TickStream] - An array of existing stream objects.

getStream

def getStream(key: str) -> 'TickStream'

Looks up an existing stream by key.

Arguments:

  • key str - Identifies the stream.

Returns:

  • TickStream - A stream object, or None if the key was not found.

createStream

def createStream(key: str, options: StreamOptions) -> 'TickStream'

Creates a new stream within the database.

Arguments:

  • key str - A required key later used to identify the stream.
  • options StreamOptions - Options for creating the stream.

Returns:

  • TickStream - A new instance of TickStream.

createFileStream

def createFileStream(key: str, dataFile: str) -> 'TickStream'

Creates a new stream mount to the given data file.

Arguments:

  • key str - A required key later used to identify the stream.
  • dataFile str - Path to the data file (on server side).

Returns:

  • TickStream - A new instance of TickStream.

createCursor

def createCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'

Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • stream TickStream - Stream from which data will be selected.
  • options SelectionOptions - Selection options.

Returns:

  • TickCursor - A cursor used to read messages.

tryCursor

@contextmanager
def tryCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'

contextmanager version of createCursor. Usage:

with db.newCursor(stream, options) as cursor:
while cursor.next():
message = cursor.getMessage()

select

def select(timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[InstrumentIdentity]') -> 'TickCursor'

Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • timestamp int - The start timestamp in millis.
  • streams list[TickStream] - Streams from which data will be selected.
  • options SelectionOptions - Selection options.
  • types list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.

Returns:

  • TickCursor - A cursor used to read messages.

trySelect

@contextmanager
def trySelect(timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[InstrumentIdentity]') -> 'TickCursor'

Contextmanager version of select. Usage:

with db.newSelect(timestamp, streams, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()

createLoader

def createLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'

Creates a channel for loading data. The loader must be closed when the loading process is finished.

Arguments:

  • stream TickStream - stream for loading data.
  • options SelectionOptions - Loading Options.

Returns:

  • TickLoader - created loader.

tryLoader

@contextmanager
def tryLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'

Contextmanager version of createLoader. Usage:

with db.newLoader(stream, options) as loader: loader.send(message)

executeQuery

def executeQuery(query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[InstrumentIdentity]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor'

Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Arguments:

  • query str - Query text element.
  • options SelectionOptions - Selection options.
  • timestamp int - The start timestamp in millis.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.
  • params list[QueryParameter] - The parameter values of the query.

Returns:

  • TickCursor - An iterable message source to read messages.

tryExecuteQuery

@contextmanager
def tryExecuteQuery(query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[InstrumentIdentity]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor'

Contextmanager version of executeQuery. Usage:

with db.newExecuteQuery('select * from stream') as cursor:
while cursor.next():
message = cursor.getMessage()

TickStream

class TickStream(_object)

The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams.

Get stream:

```
stream = tickdb.getStream('stream_key')
```

List stream:

```
streams = tickdb.listStreams()
```

key

def key() -> str

Returns the key, which uniquely identifies the stream within its database.

name

def name() -> str

Returns a user-readable short name.

distributionFactor

def distributionFactor() -> int

Returns the target number of files to be used for storing data.

description

def description() -> str

Returns a user-readable multi-line description.

owner

def owner() -> str

Returns stream owner.

location

def location() -> str

Returns stream location.

metadata

def metadata() -> str

Returns stream schema (in xml format).

scope

def scope() -> StreamScope

Returns stream schema (in xml format).

highAvailability

def highAvailability() -> bool

Returns stream memory caching parameter. High availability durable streams are cached on startup.

unique

def unique() -> bool

Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.

polymorphic

def polymorphic() -> bool

Returns whether the stream is configured as polymorphic.

periodicity

def periodicity() -> str

Returns Stream periodicity, if known.

options

def options() -> StreamOptions

Returns stream options object.

describe

def describe() -> str

Returns stream DDL description.

setSchema

def setSchema(options: StreamOptions) -> bool

Changes stream schema.

Arguments:

  • options StreamOptions - Stream options, that contains new schema xml.

    Returns

  • bool - True, if schema was changed successfully.

listEntities

def listEntities(instrumentTypes: 'list[InstrumentType]' = None) -> 'list[InstrumentIdentity]'

Return an inclusive range of times for which the specified entities have data in the database.

Arguments:

  • instrumentTypes list[InstrumentType] - instrument types for which entities should be selected. If None, all entities of stream will be selected.

Returns:

  • list[InstrumentIdentity] - selected entities.

truncate

def truncate(timestamp: int, entities: 'list[InstrumentIdentity]' = None) -> bool

Truncates stream data for the given entities from given time

Arguments:

  • timestamp int - Timestamp in millis. If time less than stream start time, then all stream data will be deleted.
  • entities list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.

Returns:

  • bool - true, if stream was truncated successfully.

clear

def clear(entities: 'list[InstrumentIdentity]' = None) -> bool

Clear stream data for the given entities.

Arguments:

  • entities list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.

purge

def purge(timestamp: int) -> bool

Deletes stream data that is older than a specified time

Arguments:

timestamp (int):Purge time in milliseconds.

Returns:

  • bool - true, if stream was purged successfully.

deleteStream

def deleteStream() -> bool

Deletes this stream

Returns:

  • bool - true, if stream was deleted successfully.

abortBackgroundProcess

def abortBackgroundProcess() -> bool

Aborts active background process if any exists

select

def select(timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[InstrumentIdentity]') -> 'TickCursor'

Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • timestamp int - The start timestamp in millis.
  • options SelectionOptions - Selection options.
  • types list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.

Returns:

  • TickCursor - A cursor used to read messages.

trySelect

@contextmanager
def trySelect(timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[InstrumentIdentity]') -> 'TickCursor'

Contextmanager version of select. Usage:

with stream.newSelect(timestamp, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()

createCursor

def createCursor(options: SelectionOptions) -> 'TickCursor'

Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options)

Arguments:

  • options SelectionOptions - Selection Options.

Returns:

A cursor used to read messages. Never null.

tryCursor

@contextmanager
def tryCursor(options: SelectionOptions) -> 'TickCursor'

contextmanager version of createCursor. Usage:

with stream.newCursor(options) as cursor:
while cursor.next():
message = cursor.getMessage()

createLoader

def createLoader(options: LoadingOptions) -> 'TickLoader'

Creates a channel for loading data. The loader must be closed when the loading process is finished.

Arguments:

  • options SelectionOptions - Loading Options.

Returns:

  • TickLoader - created loader.

tryLoader

@contextmanager
def tryLoader(options: LoadingOptions) -> 'TickLoader'

Contextmanager version of createLoader. Usage:

with stream.newLoader(options) as loader:
loader.send(message)

listSpaces

def listSpaces() -> 'list[str]'

Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned.

renameSpace

def renameSpace(newName: str, oldName: str) -> None

Rename existing space.

Arguments:

  • nameName str - space to rename.
  • oldName str - new space name.

deleteSpaces

def deleteSpaces(spaces: 'list[str]') -> None

Removed given 'spaces' permanently.

Arguments:

  • spaces list[str] - list of spaces names to delete.

getTimeRange

def getTimeRange(entities: 'list[InstrumentIdentity]' = None) -> 'list[int]'

Return an inclusive range of times for which the specified entities have data in the database.

Arguments:

  • entities list[InstrumentIdentity] - A list of entities. If empty, return for all.

getSpaceTimeRange

def getSpaceTimeRange(space: str) -> 'list[int]'

An array consisting of two long timestamps (from and to) or None if no data was found.

Arguments:

  • space str - space name.

TickCursor

class TickCursor(_object)

A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp.

To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set.

Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset:

```
options = dxapi.SelectionOptions()
cursor = tickdb.createCursor(stream, options)
cursor.subscribeToAllEntities()
cursor.subscribeToAllTypes()
cursor.reset(timestamp)
```

next

def next() -> bool

Moves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true.

Returns:

  • bool - false if at the end of the cursor.

getMessage

def getMessage() -> 'InstrumentMessage'

Returns an InstrumentMessage object cursor points at.

isAtEnd

def isAtEnd() -> bool

Returns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle.

nextIfAvailable

def nextIfAvailable() -> int

Moves cursor on to the next message, but this method NOT blocks until the next message becomes available.

Returns:

  • NextResult - OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2)

isClosed

def isClosed() -> bool

Returns true, if cursor was closed

close

def close() -> None

Close the cursor

getCurrentStreamKey

def getCurrentStreamKey() -> str

Return the key of the stream that is the source of the current message.

reset

def reset(timestamp: int, entities: 'list[InstrumentIdentity]' = None) -> None

Reposition the message source to a new point in time, while preserving current subscription.

Arguments:

  • timestamp int - The new position in time in millis.
  • entities 'list[InstrumentIdentity]' - list of entities to reset

subscribeToAllEntities

def subscribeToAllEntities() -> None

Subscribe to all available entities.

clearAllEntities

def clearAllEntities() -> None

Switch to selective subscription mode (if necessary) and clear the list.

addEntity

def addEntity(entity: InstrumentIdentity) -> None

Add the specified entity to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.

Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, 'DAV 100417P00085000'));

While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, "DAV "));

addEntities

def addEntities(entities: 'list[InstrumentIdentity]') -> None

Bulk add the specified entities to subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.

removeEntities

def removeEntities(entities: 'list[InstrumentIdentity]') -> None

Remove the specified entities from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.

removeEntity

def removeEntity(entity: InstrumentIdentity) -> None

Remove the specified entity from subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.

subscribeToAllTypes

def subscribeToAllTypes() -> None

Subscribe to all available types (no filtering).

addTypes

def addTypes(types: 'list[str]') -> None

Add the specified type names to subscription.

removeTypes

def removeTypes(types: 'list[str]') -> None

Remove the specified types from subscription.

setTypes

def setTypes(types: 'list[str]') -> None

Subscribe to specified types.

add

def add(types: 'list[str]', entities: 'list[InstrumentIdentity]') -> None

Add the specified entities and types to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.

Arguments:

  • types list[str] - not-null array of type names to subscribe.
  • entities list[InstrumentIdentity] - not-null array of instruments to subscribe.

remove

def remove(types: 'list[str]', entities: 'list[InstrumentIdentity]') -> None

Remove the specified entities and types from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.

Arguments:

  • types list[str] - not-null array of type names to unsubscribe.
  • entities list[InstrumentIdentity] - not-null array of instruments to unsubscribe.

addStreams

def addStreams(streams: 'list[TickStream]') -> None

Add streams to subscription. Current time and filter is used to query data from new sources.

Arguments:

  • streams 'list[TickStream]' - Streams to add.

removeStreams

def removeStreams(streams: 'list[TickStream]') -> None

Remove streams from subscription.

Arguments:

  • streams list[TickStream] - Streams to remove.

removeAllStreams

def removeAllStreams() -> None

Remove all streams from subscription.

setTimeForNewSubscriptions

def setTimeForNewSubscriptions(timestamp: int) -> None

This method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time.

Arguments:

  • timestamp int - The time to use.

TickLoader

class TickLoader(_object)

Object which consumes messages.

Create loader from TickDb: options = dxapi.LoadingOptions() stream = tickdb.createLoader(stream, options)

Create loader from TickStream: options = dxapi.LoadingOptions() stream = stream.createLoader(options)

send

def send(message: InstrumentIdentity) -> None

This method is invoked to send a message to the object.

Arguments:

  • message InstrumentIdentity - A temporary buffer with the message. By convention, the message is only valid for the duration of this call.

flush

def flush() -> None

Flushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.

close

def close() -> None

Flushes and closes the loader

addListener

def addListener(listener: 'ErrorListener') -> None

Register error listener. All writing data errors will be delivered to the listener.

Arguments:

  • listener ErrorListener - error listener to register.

removeListener

def removeListener(listener: 'ErrorListener') -> None

Unsubscribe registered error listener.

Arguments:

  • listener ErrorListener - error listener to unsubscribe.

nErrorListeners

def nErrorListeners() -> int

Returns number of registered error listeners

registerType

def registerType(type: str) -> int

Register type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example:

message = dxapi.InstrumentMessage()
message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader")
// as alternative, you could write:
// message.typeName = "deltix.timebase.api.messages.universal.PackageHeader"
loader.send(message)

Arguments:

  • type str - name of type to register.

Returns:

  • int - id of registered type.

registerInstrument

def registerInstrument(type: InstrumentType, symbol: str) -> int

Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:

message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrument(dxapi.InstrumentType.EQUITY, 'AAPL')
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)

Arguments:

  • type InstrumentType - type of instrument.
  • symbol str - instrument ticker.

Returns:

  • int - id of registered instrument.

registerInstrumentIdentity

def registerInstrumentIdentity(instrument: InstrumentIdentity) -> int

Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:

message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrumentIdentity(dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL'))
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)

Arguments:

  • type InstrumentType - type of instrument.
  • symbol str - instrument ticker.

Returns:

  • int - id of registered instrument.