Skip to main content

dxapi

InstrumentMessage

class InstrumentMessage(object)

setTimestamp

def setTimestamp(timestamp: int)

Set timestamp for message.

Arguments:

  • timestamp - int, epoch time in millis.

setDateTime

def setDateTime(timestamp: datetime)

Set timestamp for message from datetime.

Arguments:

  • timestamp - datetime object.

getDateTime

def getDateTime()

Returns timestamp of message as datetime object.

setNanoTime

def setNanoTime(timestamp: int)

Set timestamp for message.

Arguments:

  • timestamp - int, epoch time in nanos.

getNanoTime

def getNanoTime()

Returns timestamp of message (epoch time in nanos).

InstrumentType

class InstrumentType(object)

Represents instrument type.

Object of this class can be created from instrument type name (str), or int value:

Example:

```
fxType = dxapi.InstrumentType('FX')
indexType = dxapi.InstrumentType(dxapi.InstrumentType.INDEX)
```

Possible values:

```
EQUITY
OPTION
FUTURE
BOND
FX
INDEX
ETF
CUSTOM
SIMPLE_OPTION
EXCHANGE
TRADING_SESSION
STREAM
DATA_CONNECTOR
EXCHANGE_TRADED_SYNTHETIC
SYSTEM
CFD
UNKNOWN
```

InstrumentIdentity

class InstrumentIdentity(object)

Represents instrument, contains from instrument type and instrument symbol.

Example:

```
appleId = dxapi.InstrumentIdentity(dxapi.InstrumentType('EQUITY'), 'AAPL')
btcUsdId = dxapi.InstrumentIdentity(dxapi.InstrumentType.FX, 'BTC/USD')
```

StreamScope

class StreamScope(object)

Determines the scope of a stream's durability, if any.

Example:

```
scope = dxapi.StreamScope('TRANSIENT')
```

Possible values:

```
DURABLE,
EXTERNAL_FILE,
TRANSIENT,
RUNTIME
```

WriteMode

class WriteMode(object)

APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time. INSERT: New data inserts into a stream without truncations.

Example:

```
mode = dxapi.WriteMode('TRUNCATE')
```

Possible values:

```
APPEND,
REPLACE,
REWRITE,
TRUNCATE,
INSERT
```

SelectionOptions

class SelectionOptions(object)

Options for selecting data from a stream.

Example:

```
so = dxapi.SelectionOptions()
so._from = 0
so.to = 100000
so.useCompression = False
so.live = True
so.reverse = False
so.allowLateOutOfOrder = True
so.realTimeNotification = True
so.minLatency = False
```

Properties:

  • _from int - Start timestamp in millis.
  • to int - End timestamp in millis.
  • useCompression bool - Use compression.
  • live bool - Instead of returning false from next () at the end of the stream, wait for live data to be added.
  • reverse bool - Specify cursor direction.
  • allowLateOutOfOrder bool - Output out-of-order late messages. Timebase consumers receive historical messages they requested strictly ordered by their time. For scenarios when new messages arrive in the middle of consumer's session (So called 'live' mode) it is possible that newly arrived message has a timestamp in already consumer time region. In this cases this flag allows consumer to receive these 'late' messages even if they out of order with respect to current baseline time.
  • NOTE - Late Messages that are timestamped prior to consumer's select time or last reset time will not be delivered even with this flag enabled.
  • realTimeNotification bool - Enabled/Disables sending system messages when cursor switches from historical to realtime mode.
  • minLatency bool - try to receive messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.

withSpaces

def withSpaces(spaces: 'list[str]') -> None

List of spaces to select data from. If set to None then data from all spaces is loaded.

LoadingOptions

class LoadingOptions(object)

Options for loading data into a stream.

Example:

```
lo = dxapi.LoadingOptions()
lo.writeMode = dxapi.WriteMode('TRUNCATE')
lo.space = 'myspace'
lo.minLatency = False
```

Properties:

  • writeMode WriteMode - see WriteMode class description.
  • minLatency bool - try to send messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.
  • space str - Data Partition. Contains unique number of instruments or Time Ranges.

StreamOptions

class StreamOptions(object)

Stream definition attributes.

Example:

```
so = dxapi.StreamOptions()
so.name('Test Name')
so.description('Test Description')
so.owner('Test Owner')
so.metadata(schema)
so.scope = dxapi.StreamScope('TRANSIENT')
so.distributionFactor = 1
so.polymorphic = True
so.periodicity = 'STATIC'
so.unique = True
so.duplicatesAllowed = False
so.version = '4.3'

db.createStream(key, so)
```

Properties:

  • scope StreamScope - Determines persistent properties of a stream.

  • distributionFactor int - The number of M-files into which to distribute the data. Supply MAX_DISTRIBUTION to keep a separate file for each instrument (default).

  • duplicatesAllowed bool - Indicates that loader will ignore binary similar messages(for 'unique' streams only).

  • highAvailability bool - High availability durable streams are cached on startup.

  • unique bool - Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.

    polymorphic (bool).

  • periodicity bool - Stream periodicity, if known.

  • version str - Stream format version. Supported versions are: '5.0' - Editable past format, TS data file; '4.3' - 'Classic' storage format, allocation free.

name

def name(name: str = None) -> None

Optional user-readable name.

description

def description(description: str = None) -> None

Optional multi-line description.

owner

def owner(owner: str = None) -> None

Optional owner of stream. During stream creation it will be set equals to authenticated user name.

location

def location(location: str = None) -> None

Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)

distributionRuleName

def distributionRuleName(distributionRuleName: str = None) -> None

Class name of the distribution rule

metadata

def metadata(metadata: 'SchemaDef or str(json)' = None) -> 'SchemaDef'

Stream metadata as dxapi.SchemaDef object.

schema

def schema(metadata: 'SchemaDef or str(json)' = None) -> 'SchemaDef'

Stream metadata as dxapi.SchemaDef object. Alias for metadata().

metadataXml

def metadataXml(metadataXml: str = None) -> None

Stream metadata in XML format.

QueryParameter

class QueryParameter(object)

Input parameter definition for a prepared statement.

LockType

class LockType(object)

Represents lock type.

Object of this class can be created from instrument type name (str), or int value:

Example:

```
readLockType = dxapi.LockType('READ')
writeLockType = dxapi.LockType('WRITE')
writeLockType = dxapi.LockType(dxapi.LockType.READ_LOCK)
```

Possible values:

```
READ,
WRITE
```

LockOptions

class LockOptions(object)

Options for stream locking.

Example:

```
lo = dxapi.LockOptions()
lo.type = dxapi.LockType('WRITE')
lo.startTime = 0
lo.endTime = 1000
```

Properties:

  • type LockType - type of lock;
  • startTime int - Start time for ranged write locks (ms);
  • endTime int - End time for ranged write locks (ms).

ExecutionStatus

class ExecutionStatus(object)

Background process status.

Object of this class can be created from instrument type name (str), or int value:

Possible values:

```
NONE,
RUNNING,
COMPLETED,
ABORTED,
FAILED
```

BackgroundProcessInfo

class BackgroundProcessInfo(object)

Background process info.

Example:

```
stream.changeSchema(schema, None, defaults, True)
while str(stream.backgroundProcessInfo().status) != 'Completed':
print('Waiting task to finish...')
time.sleep(1)
```

Properties:

  • status ExecutionStatus - status of process;
  • progress double - progress of process (from 0.0 to 1.0);
  • startTime int - task start time in millis;
  • endTime int - task start time in millis.

TickDb

class TickDb(object)

The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl:

db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011')

or

db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')

createFromUrl

@staticmethod
def createFromUrl(url: str,
user: str = None,
password: str = None) -> "TickDb"

Creates a new database instance with the specified root folder, or URL.

Arguments:

  • url str - Connection URL.
  • user str - User.
  • password str - Password.

Returns:

  • TickDb - An un-opened TickDB instance.

openFromUrl

@staticmethod
@contextmanager
def openFromUrl(url: str,
readonly: bool,
user: str = None,
password: str = None)

Creates a new database instance with the specified root folder, or URL, and opens it.

Arguments:

  • url str - Connection URL.
  • readonly bool - Open data store in read-only mode.
  • user str - User.
  • password str - Password.

Returns:

  • TickDb - An opened TickDB instance.

setApplicationName

def setApplicationName(name: str) -> None

Set custom visible name of application.

Arguments:

  • name str - Application name.

isReadOnly

def isReadOnly() -> bool

Determines whether the store is open as read-only.

isOpen

def isOpen() -> bool

Determines whether the store is open.

open

def open(readOnlyMode: bool) -> bool

Open the data store.

Arguments:

  • readOnlyMode bool - Open data store in read-only mode.

close

def close() -> None

Closes data store.

format

def format() -> bool

Create a new object on disk and format internally. The data store is left open for read-write at the end of this method.

generateSchema

def generateSchema(types: 'list[str]') -> str

Generates SchemaDef object from specified content types.

schema = db.generateSchema(['deltix.timebase.api.messages.TradeMessage', 
'deltix.timebase.api.messages.BestBidOfferMessage'])

listStreams

def listStreams() -> 'list[TickStream]'

Enumerates existing streams.

Returns:

  • list[TickStream] - An array of existing stream objects.

getStream

def getStream(key: str) -> 'TickStream'

Looks up an existing stream by key.

Arguments:

  • key str - Identifies the stream.

Returns:

  • TickStream - A stream object, or None if the key was not found.

createStream

def createStream(key: str, options: StreamOptions) -> 'TickStream'

Creates a new stream within the database.

Arguments:

  • key str - A required key later used to identify the stream.
  • options StreamOptions - Options for creating the stream.

Returns:

  • TickStream - A new instance of TickStream.

createFileStream

def createFileStream(key: str, dataFile: str) -> 'TickStream'

Creates a new stream mount to the given data file.

Arguments:

  • key str - A required key later used to identify the stream.
  • dataFile str - Path to the data file (on server side).

Returns:

  • TickStream - A new instance of TickStream.

createCursor

def createCursor(stream: 'TickStream',
options: SelectionOptions) -> 'TickCursor'

Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • stream TickStream - Stream from which data will be selected.
  • options SelectionOptions - Selection options.

Returns:

  • TickCursor - A cursor used to read messages.

tryCursor

@contextmanager
def tryCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'

contextmanager version of createCursor. Usage:

with db.tryCursor(stream, options) as cursor:
while cursor.next():
message = cursor.getMessage()

select

def select(timestamp: int, streams: 'list[TickStream]',
options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> 'TickCursor'

Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • timestamp int - The start timestamp in millis.
  • streams list[TickStream] - Streams from which data will be selected.
  • options SelectionOptions - Selection options.
  • types list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.

Returns:

  • TickCursor - A cursor used to read messages.

trySelect

@contextmanager
def trySelect(
timestamp: int, streams: 'list[TickStream]', options: SelectionOptions,
types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> 'TickCursor'

Contextmanager version of select. Usage:

with db.trySelect(timestamp, streams, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()

createLoader

def createLoader(stream: 'TickStream',
options: LoadingOptions) -> 'TickLoader'

Creates a channel for loading data. The loader must be closed when the loading process is finished.

Arguments:

  • stream TickStream - stream for loading data.
  • options SelectionOptions - Loading Options.

Returns:

  • TickLoader - created loader.

tryLoader

@contextmanager
def tryLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'

Contextmanager version of createLoader. Usage:

with db.tryLoader(stream, options) as loader: loader.send(message)

executeQuery

def executeQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity] or list[str]' = None,
params: 'list[QueryParameter]' = []) -> 'TickCursor'

Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Arguments:

  • query str - Query text element.
  • options SelectionOptions - Selection options.
  • timestamp int - The start timestamp in millis.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.
  • params list[QueryParameter] - The parameter values of the query.

Returns:

  • TickCursor - An iterable message source to read messages.

tryExecuteQuery

@contextmanager
def tryExecuteQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity] or list[str]' = None,
params: 'list[QueryParameter]' = []) -> 'TickCursor'

Contextmanager version of executeQuery. Usage:

with db.tryExecuteQuery('select * from stream') as cursor:
while cursor.next():
message = cursor.getMessage()

TickStream

class TickStream(object)

The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams.

Get stream:

```
stream = tickdb.getStream('stream_key')
```

List stream:

```
streams = tickdb.listStreams()
```

key

def key() -> str

Returns the key, which uniquely identifies the stream within its database.

name

def name() -> str

Returns a user-readable short name.

distributionFactor

def distributionFactor() -> int

Returns the target number of files to be used for storing data.

description

def description() -> str

Returns a user-readable multi-line description.

owner

def owner() -> str

Returns stream owner.

location

def location() -> str

Returns stream location.

metadata

def metadata() -> str

Returns stream schema as dxapi.SchemaDef object.

schema

def schema() -> str

Returns stream schema as dxapi.SchemaDef object. (Alias for metadata)

metadataXml

def metadataXml() -> str

Returns stream schema (in xml format).

scope

def scope() -> StreamScope

Returns stream schema (in xml format).

version

def version() -> str

Returns stream data format version.

highAvailability

def highAvailability() -> bool

Returns stream memory caching parameter. High availability durable streams are cached on startup.

unique

def unique() -> bool

Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.

polymorphic

def polymorphic() -> bool

Returns whether the stream is configured as polymorphic.

periodicity

def periodicity() -> str

Returns Stream periodicity, if known.

options

def options() -> StreamOptions

Returns stream options object.

describe

def describe() -> str

Returns stream DDL description.

changeSchema

def changeSchema(schema: 'SchemaDef',
mappings: 'dict' = None,
defaults: 'dict' = None,
background: bool = False) -> bool

Run schema change task for stream.

Arguments:

schema (SchemaDef or str(json)): New stream schema;

  • mappings dict - column mappings in format 'TypeName:FiledName', for example: { 'deltix.timebase.api.messages.TradeMessage:size' : 'deltix.timebase.api.messages.TradeMessage:newSizeField' };
  • defaults dict - default values for not nullable columns. For example, { 'deltix.timebase.api.messages.TradeMessage:newSizeField' : '1.23' };
  • background bool - true to run task in background mode, in this case method returns right after rest query to server. You can track status of background task with backgroundProcessInfo() method. You can abort task with abortBackgroundProcess() method.

backgroundProcessInfo

def backgroundProcessInfo() -> 'BackgroundProcessInfo'

Gets stream background process information.

abortBackgroundProcess

def abortBackgroundProcess() -> None

Aborts active background process if any exists

listEntities

def listEntities(
instrumentTypes: 'list[InstrumentType]' = None
) -> 'list[InstrumentIdentity]'

Return list of instruments in stream.

Arguments:

  • instrumentTypes list[InstrumentType] - instrument types for which entities should be selected. If None, all entities of stream will be selected.

Returns:

  • list[InstrumentIdentity] - stream instruments.

listSymbols

def listSymbols(instrumentTypes: 'list[InstrumentType]' = None) -> 'list[str]'

Return list of symbols in stream.

Arguments:

  • instrumentTypes list[str] - instrument types for which entities should be selected. If None, all entities of stream will be selected.

Returns:

  • list[InstrumentIdentity] - stream symbols.

truncate

def truncate(timestamp: int,
entities: 'list[InstrumentIdentity] or list[str]' = None) -> bool

Truncates stream data for the given entities from given time

Arguments:

  • timestamp int - Timestamp in millis. If time less than stream start time, then all stream data will be deleted.
  • entities list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.

Returns:

  • bool - true, if stream was truncated successfully.

clear

def clear(entities: 'list[InstrumentIdentity] or list[str]' = None) -> bool

Clear stream data for the given entities.

Arguments:

  • entities list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.

purge

def purge(timestamp: int) -> bool

Deletes stream data that is older than a specified time

Arguments:

timestamp (int):Purge time in milliseconds.

Returns:

  • bool - true, if stream was purged successfully.

deleteData

def deleteData(
fromMs: int,
toMs: int,
entities: 'list[InstrumentIdentity] or list[str]' = None) -> bool

Deletes stream data for the given entities using specified time range.

Arguments:

  • from int - start timestamp (inclusive). Time is measured in milliseconds that passed since January 1, 1970 UTC.
  • to int - end timestamp (inclusive). Time is measured in milliseconds that passed since January 1, 1970 UTC. If time more than stream end time, then all stream data will be deleted for the given stream.
  • entities list[InstrumentIdentity] or list[str] - A list of entities. If None, all stream entities will be used.

Returns:

  • bool - true, if stream was truncated successfully.

deleteStream

def deleteStream() -> bool

Deletes this stream

Returns:

  • bool - true, if stream was deleted successfully.

abortBackgroundProcess

def abortBackgroundProcess() -> bool

Aborts active background process if any exists

lock

def lock(options: 'LockType or LockOptions' = None) -> None

Acquire a lock of this stream. Default lock type is WRITE.

tryLock

def tryLock(options: 'LockType or LockOptions', timeout: 'int') -> None

Blocking operation that attempts to obtain given type of lock on this stream. If lock cannot be obtained during specified timeout operation fails with exception.

unlock

def unlock() -> None

Releases stream lock.

select

def select(timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> 'TickCursor'

Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.

Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.

Arguments:

  • timestamp int - The start timestamp in millis.
  • options SelectionOptions - Selection options.
  • types list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.
  • entities list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.

Returns:

  • TickCursor - A cursor used to read messages.

trySelect

@contextmanager
def trySelect(
timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> 'TickCursor'

Contextmanager version of select. Usage:

with stream.trySelect(timestamp, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()

createCursor

def createCursor(options: SelectionOptions) -> 'TickCursor'

Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options)

Arguments:

  • options SelectionOptions - Selection Options.

Returns:

A cursor used to read messages. Never null.

tryCursor

@contextmanager
def tryCursor(options: SelectionOptions) -> 'TickCursor'

contextmanager version of createCursor. Usage:

with stream.tryCursor(options) as cursor:
while cursor.next():
message = cursor.getMessage()

createLoader

def createLoader(options: LoadingOptions) -> 'TickLoader'

Creates a channel for loading data. The loader must be closed when the loading process is finished.

Arguments:

  • options SelectionOptions - Loading Options.

Returns:

  • TickLoader - created loader.

tryLoader

@contextmanager
def tryLoader(options: LoadingOptions) -> 'TickLoader'

Contextmanager version of createLoader. Usage:

with stream.tryLoader(options) as loader:
loader.send(message)

listSpaces

def listSpaces() -> 'list[str]'

Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned.

renameSpace

def renameSpace(newName: str, oldName: str) -> None

Rename existing space.

Arguments:

  • nameName str - space to rename.
  • oldName str - new space name.

deleteSpaces

def deleteSpaces(spaces: 'list[str]') -> None

Removed given 'spaces' permanently.

Arguments:

  • spaces list[str] - list of spaces names to delete.

getTimeRange

def getTimeRange(
entities: 'list[InstrumentIdentity] or list[str]' = None
) -> 'list[int]'

Return an inclusive range of times for which the specified entities have data in the database.

Arguments:

  • entities list[InstrumentIdentity] - A list of entities or list of symbols. If empty, return for all.

getSpaceTimeRange

def getSpaceTimeRange(space: str) -> 'list[int]'

An array consisting of two long timestamps (from and to) or None if no data was found.

Arguments:

  • space str - space name.

TickCursor

class TickCursor(object)

A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp.

To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set.

Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset:

```
options = dxapi.SelectionOptions()
cursor = tickdb.createCursor(stream, options)
cursor.subscribeToAllEntities()
cursor.subscribeToAllTypes()
cursor.reset(timestamp)
```

next

def next() -> bool

Moves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true.

Returns:

  • bool - false if at the end of the cursor.

getMessage

def getMessage() -> 'InstrumentMessage'

Returns an InstrumentMessage object cursor points at.

isAtEnd

def isAtEnd() -> bool

Returns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle.

nextIfAvailable

def nextIfAvailable() -> int

Moves cursor on to the next message, but this method NOT blocks until the next message becomes available.

Returns:

  • NextResult - OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2)

isClosed

def isClosed() -> bool

Returns true, if cursor was closed

close

def close() -> None

Close the cursor

getCurrentStreamKey

def getCurrentStreamKey() -> str

Return the key of the stream that is the source of the current message.

reset

def reset(timestamp: int,
entities: 'list[InstrumentIdentity] or list[str]' = None) -> None

Reposition the message source to a new point in time, while preserving current subscription.

Arguments:

  • timestamp int - The new position in time in millis.
  • entities 'list[InstrumentIdentity]' - list of entities to reset

subscribeToAllEntities

def subscribeToAllEntities() -> None

Subscribe to all available entities.

clearAllEntities

def clearAllEntities() -> None

Switch to selective subscription mode (if necessary) and clear the list.

addEntity

def addEntity(entity: InstrumentIdentity) -> None

Add the specified entity to subscription.

Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, 'DAV 100417P00085000'));

While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, "DAV "));

addSymbol

def addSymbol(symbol: str) -> None

Add the specified symbol to subscription.

addEntities

def addEntities(entities: 'list[InstrumentIdentity]') -> None

Bulk add the specified entities to subscription.

addSymbols

def addSymbols(symbols: 'list[str]') -> None

Bulk add the specified symbols to subscription.

removeEntities

def removeEntities(entities: 'list[InstrumentIdentity]') -> None

Remove the specified entities from subscription.

removeSymbols

def removeSymbols(symbols: 'list[str]') -> None

Remove the specified symbols from subscription.

removeEntity

def removeEntity(entity: InstrumentIdentity) -> None

Remove the specified entity from subscription.

removeSymbol

def removeSymbol(symbol: str) -> None

Remove the specified symbol from subscription.

subscribeToAllTypes

def subscribeToAllTypes() -> None

Subscribe to all available types (no filtering).

addTypes

def addTypes(types: 'list[str]') -> None

Add the specified type names to subscription.

removeTypes

def removeTypes(types: 'list[str]') -> None

Remove the specified types from subscription.

setTypes

def setTypes(types: 'list[str]') -> None

Subscribe to specified types.

add

def add(types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> None

Add the specified entities and types to subscription.

Arguments:

  • types list[str] - not-null array of type names to subscribe.
  • entities list[InstrumentIdentity] - not-null array of instruments to subscribe.

remove

def remove(types: 'list[str]',
entities: 'list[InstrumentIdentity] or list[str]') -> None

Remove the specified entities and types from subscription.

Arguments:

  • types list[str] - not-null array of type names to unsubscribe.
  • entities list[InstrumentIdentity] - not-null array of instruments to unsubscribe.

addStreams

def addStreams(streams: 'list[TickStream]') -> None

Add streams to subscription. Current time and filter is used to query data from new sources.

Arguments:

  • streams 'list[TickStream]' - Streams to add.

removeStreams

def removeStreams(streams: 'list[TickStream]') -> None

Remove streams from subscription.

Arguments:

  • streams list[TickStream] - Streams to remove.

removeAllStreams

def removeAllStreams() -> None

Remove all streams from subscription.

setTimeForNewSubscriptions

def setTimeForNewSubscriptions(timestamp: int) -> None

This method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time.

Arguments:

  • timestamp int - The time to use.

TickLoader

class TickLoader(object)

Object which consumes messages.

Create loader from TickDb: options = dxapi.LoadingOptions() stream = tickdb.createLoader(stream, options)

Create loader from TickStream: options = dxapi.LoadingOptions() stream = stream.createLoader(options)

send

def send(message: InstrumentMessage) -> None

This method is invoked to send a message to the object.

Arguments:

  • message InstrumentMessage - A temporary buffer with the message. By convention, the message is only valid for the duration of this call.

flush

def flush() -> None

Flushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.

isClosed

def isClosed() -> bool

Returns true, if cursor was closed

close

def close() -> None

Flushes and closes the loader

addListener

def addListener(listener: 'ErrorListener') -> None

Register error listener. All writing data errors will be delivered to the listener. Note, that instance of listener should be stored in variable and sholdn't be destroyed until you close the loader.

Arguments:

  • listener ErrorListener - error listener to register.

removeListener

def removeListener(listener: 'ErrorListener') -> None

Unsubscribe registered error listener.

Arguments:

  • listener ErrorListener - error listener to unsubscribe.

nErrorListeners

def nErrorListeners() -> int

Returns number of registered error listeners

registerType

def registerType(type: str) -> int

Register type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example:

message = dxapi.InstrumentMessage()
message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader")
// as alternative, you could write:
// message.typeName = "deltix.timebase.api.messages.universal.PackageHeader"
loader.send(message)

Arguments:

  • type str - name of type to register.

Returns:

  • int - id of registered type.

registerInstrument

def registerInstrument(type: InstrumentType, symbol: str) -> int

Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:

message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrument(dxapi.InstrumentType.EQUITY, 'AAPL')
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)

Arguments:

  • type InstrumentType - type of instrument.
  • symbol str - instrument ticker.

Returns:

  • int - id of registered instrument.

registerInstrumentIdentity

def registerInstrumentIdentity(instrument: InstrumentIdentity) -> int

Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:

message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrumentIdentity(dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL'))
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)

Arguments:

  • type InstrumentType - type of instrument.
  • symbol str - instrument ticker.

Returns:

  • int - id of registered instrument.

ErrorListener

class ErrorListener(object)

Listener for errors related to loading/sending data into a TickLoader.

Usage:

```
class MyErrorListener(dxapi.ErrorListener):
def onError(self, errorClass, errorMsg):
print('Error: ' + errorMsg)

listener = MyErrorListener() # save listener
loader.addListener(listener)
loader.send()
...

loader.removeListener(listener)
loader.close()
```

SchemaDef

class SchemaDef()

TimeBase stream schema definition class.

Example:

```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
```

Properties:

  • all list[TypeDef] - all types of stream schema (including content, non-content classes and enums).
  • types list[TypeDef] - content (top level) types of stream.

TypeDef

class TypeDef()

TimeBase stream type definition class.

Example:

```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
```

Properties:

  • name str - name of type.
  • title str - title of type.
  • fields list[FieldDef] - list of type fields.
  • isEnum bool - true, if type is enum.
  • parent str - name of parent type.
  • isAbstract bool - true, if type if abstract.

FieldDef

class FieldDef()

TimeBase type field definition class.

Example:

```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')
```

Properties:

  • name str - name of field.
  • title str - title of field.
  • description str - description of field.
  • type DataTypeDef - type of field.
  • static bool - true, if field is static.
  • value str - static value of field.
  • relativeTo str - field name (used to specify that field decoding depends on another field value).
  • primaryKey bool - true, if field is a part of the message's primary key.
  • hide bool - true, if field is hidden.

DataTypeDef

class DataTypeDef()

TimeBase field type definition class.

Example:

```
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')
priceField.type.encoding = 'IEEE64'
```

Properties:

  • name str - name of field type.
  • encoding str - encoding of field type.
  • nullable bool - true for nullable field types.
  • types list[str] - list of schema type names for polymorphic types (for objects).
  • elementType str - type name of array element data type (for arrays).