Python API
InstrumentMessage
class InstrumentMessage(object)
setTimestamp
def setTimestamp(timestamp: int)
Set timestamp for message.
Arguments:
timestamp
- int, epoch time in millis.
setDateTime
def setDateTime(timestamp: datetime)
Set timestamp for message from datetime.
Arguments:
timestamp
- datetime object.
getDateTime
def getDateTime()
Returns timestamp of message as datetime object.
setNanoTime
def setNanoTime(timestamp: int)
Set timestamp for message.
Arguments:
timestamp
- int, epoch time in nanos.
getNanoTime
def getNanoTime()
Returns timestamp of message (epoch time in nanos).
InstrumentType
class InstrumentType(object)
Represents instrument type.
Object of this class can be created from instrument type name (str), or int value:
Example:
fxType = dxapi.InstrumentType('FX')
indexType = dxapi.InstrumentType(dxapi.InstrumentType.INDEX)Possible values:
EQUITY
OPTION
FUTURE
BOND
FX
INDEX
ETF
CUSTOM
SIMPLE_OPTION
EXCHANGE
TRADING_SESSION
STREAM
DATA_CONNECTOR
EXCHANGE_TRADED_SYNTHETIC
SYSTEM
CFD
UNKNOWN
InstrumentIdentity
class InstrumentIdentity(object)
Represents instrument, contains from instrument type and instrument symbol.
Example:
appleId = dxapi.InstrumentIdentity(dxapi.InstrumentType('EQUITY'), 'AAPL')
btcUsdId = dxapi.InstrumentIdentity(dxapi.InstrumentType.FX, 'BTC/USD')
StreamScope
class StreamScope(object)
Determines the scope of a stream's durability, if any.
Example:
scope = dxapi.StreamScope('TRANSIENT')
Possible values:
DURABLE,
EXTERNAL_FILE,
TRANSIENT,
RUNTIME
WriteMode
class WriteMode(object)
APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time.
Example:
mode = dxapi.WriteMode('TRUNCATE')
Possible values:
APPEND,
REPLACE,
REWRITE,
TRUNCATE
SelectionOptions
class SelectionOptions(object)
Options for selecting data from a stream.
Example:
so = dxapi.SelectionOptions()
so._from = 0
so.to = 100000
so.useCompression = False
so.live = True
so.reverse = False
so.allowLateOutOfOrder = True
so.realTimeNotification = True
so.minLatency = FalseProperties:
_from
int - Start timestamp in millis.
to
int - End timestamp in millis.
useCompression
bool - Use compression.
live
bool - Instead of returning false from next () at the end of the stream, wait for live data to be added.
reverse
bool - Specify cursor direction.
allowLateOutOfOrder
bool - Output out-of-order late messages. Timebase consumers receive historical messages they requested strictly ordered by their time. For scenarios when new messages arrive in the middle of consumer's session (So called 'live' mode) it is possible that newly arrived message has a timestamp in already consumer time region. In this cases this flag allows consumer to receive these 'late' messages even if they out of order with respect to current baseline time.
NOTE
- Late Messages that are timestamped prior to consumer's select time or last reset time will not be delivered even with this flag enabled.
realTimeNotification
bool - Enabled/Disables sending system messages when cursor switches from historical to realtime mode.
minLatency
bool - try to receive messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.
LoadingOptions
class LoadingOptions(object)
Options for loading data into a stream.
Example:
lo = dxapi.LoadingOptions()
lo.writeMode = dxapi.WriteMode('TRUNCATE')
lo.space = 'myspace'
lo.minLatency = FalseProperties:
writeMode
WriteMode - see WriteMode class description.
minLatency
bool - try to send messages ASAP, with minimal buffering. Can potentially reduce max throughput and increase CPU use / network bandwidth usage.
space
str - Data Partition. Contains unique number of instruments or Time Ranges.
StreamOptions
class StreamOptions(object)
Stream definition attributes.
Example:
so = dxapi.StreamOptions()
so.name('Test Name')
so.description('Test Description')
so.owner('Test Owner')
so.metadata(schema)
so.scope = dxapi.StreamScope('TRANSIENT')
so.distributionFactor = 1
so.polymorphic = True
so.periodicity = 'STATIC'
so.unique = True
so.duplicatesAllowed = False
so.version = '4.3'
db.createStream(key, so)Properties:
scope
StreamScope - Determines persistent properties of a stream.
distributionFactor
int - The number of M-files into which to distribute the data. Supply MAX_DISTRIBUTION to keep a separate file for each instrument (default).
duplicatesAllowed
bool - Indicates that loader will ignore binary similar messages(for 'unique' streams only).
highAvailability
bool - High availability durable streams are cached on startup.
unique
bool - Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.polymorphic (bool).
periodicity
bool - Stream periodicity, if known.
version
str - Stream format version. Supported versions are: '5.0' - Editable past format, TS data file; '4.3' - 'Classic' storage format, allocation free.
name
def name(name: str = None) -> None
Optional user-readable name.
description
def description(description: str = None) -> None
Optional multi-line description.
owner
def owner(owner: str = None) -> None
Optional owner of stream. During stream creation it will be set equals to authenticated user name.
location
def location(location: str = None) -> None
Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)
distributionRuleName
def distributionRuleName(distributionRuleName: str = None) -> None
Class name of the distribution rule
metadata
def metadata(metadata: 'SchemaDef or str(json)' = None) -> 'SchemaDef'
Stream metadata as dxapi.SchemaDef object.
schema
def schema(metadata: 'SchemaDef or str(json)' = None) -> 'SchemaDef'
Stream metadata as dxapi.SchemaDef object. Alias for metadata().
metadataXml
def metadataXml(metadataXml: str = None) -> None
Stream metadata in XML format.
QueryParameter
class QueryParameter(object)
Input parameter definition for a prepared statement.
LockType
class LockType(object)
Represents lock type.
Object of this class can be created from instrument type name (str), or int value:
Example:
readLockType = dxapi.LockType('READ')
writeLockType = dxapi.LockType('WRITE')
writeLockType = dxapi.LockType(dxapi.LockType.READ_LOCK)Possible values:
READ,
WRITE
LockOptions
class LockOptions(object)
Options for stream locking.
Example:
lo = dxapi.LockOptions()
lo.type = dxapi.LockType('WRITE')
lo.startTime = 0
lo.endTime = 1000Properties:
type
LockType - type of lock;
startTime
int - Start time for ranged write locks (ms);
endTime
int - End time for ranged write locks (ms).
ExecutionStatus
class ExecutionStatus(object)
Background process status.
Object of this class can be created from instrument type name (str), or int value:
Possible values:
```
NONE,
RUNNING,
COMPLETED,
ABORTED,
FAILED
```
BackgroundProcessInfo
class BackgroundProcessInfo(object)
Background process info.
Example:
stream.changeSchema(schema, None, defaults, True)
while str(stream.backgroundProcessInfo().status) != 'Completed':
print('Waiting task to finish...')
time.sleep(1)Properties:
status
ExecutionStatus - status of process;
progress
double - progress of process (from 0.0 to 1.0);
startTime
int - task start time in millis;
endTime
int - task start time in millis.
TickDb
class TickDb(object)
The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl:
db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011')
or
db = dxapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')
createFromUrl
@staticmethod
def createFromUrl(url: str,
user: str = None,
password: str = None) -> "TickDb"
Creates a new database instance with the specified root folder, or URL.
Arguments:
url
str - Connection URL.user
str - User.password
str - Password.Returns:
TickDb
- An un-opened TickDB instance.
openFromUrl
@staticmethod
@contextmanager
def openFromUrl(url: str,
readonly: bool,
user: str = None,
password: str = None)
Creates a new database instance with the specified root folder, or URL, and opens it.
Arguments:
url
str - Connection URL.readonly
bool - Open data store in read-only mode.user
str - User.password
str - Password.Returns:
TickDb
- An opened TickDB instance.
setApplicationName
def setApplicationName(name: str) -> None
Set custom visible name of application.
Arguments:
name
str - Application name.
isReadOnly
def isReadOnly() -> bool
Determines whether the store is open as read-only.
isOpen
def isOpen() -> bool
Determines whether the store is open.
open
def open(readOnlyMode: bool) -> bool
Open the data store.
Arguments:
readOnlyMode
bool - Open data store in read-only mode.
close
def close() -> None
Closes data store.
format
def format() -> bool
Create a new object on disk and format internally. The data store is left open for read-write at the end of this method.
generateSchema
def generateSchema(types: 'list[str]') -> str
Generates SchemaDef object from specified content types.
schema = db.generateSchema(['deltix.timebase.api.messages.TradeMessage',
'deltix.timebase.api.messages.BestBidOfferMessage'])
listStreams
def listStreams() -> 'list[TickStream]'
Enumerates existing streams.
Returns:
list[TickStream]
- An array of existing stream objects.
getStream
def getStream(key: str) -> 'TickStream'
Looks up an existing stream by key.
Arguments:
key
str - Identifies the stream.Returns:
TickStream
- A stream object, or None if the key was not found.
createStream
def createStream(key: str, options: StreamOptions) -> 'TickStream'
Creates a new stream within the database.
Arguments:
key
str - A required key later used to identify the stream.options
StreamOptions - Options for creating the stream.Returns:
TickStream
- A new instance of TickStream.
createFileStream
def createFileStream(key: str, dataFile: str) -> 'TickStream'
Creates a new stream mount to the given data file.
Arguments:
key
str - A required key later used to identify the stream.dataFile
str - Path to the data file (on server side).Returns:
TickStream
- A new instance of TickStream.
createCursor
def createCursor(stream: 'TickStream',
options: SelectionOptions) -> 'TickCursor'
Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
stream
TickStream - Stream from which data will be selected.options
SelectionOptions - Selection options.Returns:
TickCursor
- A cursor used to read messages.
tryCursor
@contextmanager
def tryCursor(stream: 'TickStream', options: SelectionOptions) -> 'TickCursor'
contextmanager version of createCursor. Usage:
with db.tryCursor(stream, options) as cursor:
while cursor.next():
message = cursor.getMessage()
select
def select(timestamp: int, streams: 'list[TickStream]',
options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity]') -> 'TickCursor'
Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestamp
int - The start timestamp in millis.streams
list[TickStream] - Streams from which data will be selected.options
SelectionOptions - Selection options.types
list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entities
list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor
- A cursor used to read messages.
trySelect
@contextmanager
def trySelect(timestamp: int, streams: 'list[TickStream]',
options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity]') -> 'TickCursor'
Contextmanager version of select. Usage:
with db.trySelect(timestamp, streams, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
createLoader
def createLoader(stream: 'TickStream',
options: LoadingOptions) -> 'TickLoader'
Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
stream
TickStream - stream for loading data.options
SelectionOptions - Loading Options.Returns:
TickLoader
- created loader.
tryLoader
@contextmanager
def tryLoader(stream: 'TickStream', options: LoadingOptions) -> 'TickLoader'
Contextmanager version of createLoader. Usage:
with db.tryLoader(stream, options) as loader: loader.send(message)
executeQuery
def executeQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity]' = None,
params: 'list[QueryParameter]' = []) -> 'TickCursor'
Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Arguments:
query
str - Query text element.options
SelectionOptions - Selection options.timestamp
int - The start timestamp in millis.entities
list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.params
list[QueryParameter] - The parameter values of the query.Returns:
TickCursor
- An iterable message source to read messages.
tryExecuteQuery
@contextmanager
def tryExecuteQuery(query: str,
options: SelectionOptions = None,
timestamp: int = JAVA_LONG_MIN_VALUE,
entities: 'list[InstrumentIdentity]' = None,
params: 'list[QueryParameter]' = []) -> 'TickCursor'
Contextmanager version of executeQuery. Usage:
with db.tryExecuteQuery('select * from stream') as cursor:
while cursor.next():
message = cursor.getMessage()
TickStream
class TickStream(object)
The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams.
Get stream:
```
stream = tickdb.getStream('stream_key')
```List stream:
```
streams = tickdb.listStreams()
```
key
def key() -> str
Returns the key, which uniquely identifies the stream within its database.
name
def name() -> str
Returns a user-readable short name.
distributionFactor
def distributionFactor() -> int
Returns the target number of files to be used for storing data.
description
def description() -> str
Returns a user-readable multi-line description.
owner
def owner() -> str
Returns stream owner.
location
def location() -> str
Returns stream location.
metadata
def metadata() -> str
Returns stream schema as dxapi.SchemaDef object.
schema
def schema() -> str
Returns stream schema as dxapi.SchemaDef object. (Alias for metadata)
metadataXml
def metadataXml() -> str
Returns stream schema (in xml format).
scope
def scope() -> StreamScope
Returns stream schema (in xml format).
version
def version() -> str
Returns stream data format version.
highAvailability
def highAvailability() -> bool
Returns stream memory caching parameter. High availability durable streams are cached on startup.
unique
def unique() -> bool
Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.
polymorphic
def polymorphic() -> bool
Returns whether the stream is configured as polymorphic.
periodicity
def periodicity() -> str
Returns Stream periodicity, if known.
options
def options() -> StreamOptions
Returns stream options object.
describe
def describe() -> str
Returns stream DDL description.
changeSchema
def changeSchema(schema: 'SchemaDef',
mappings: 'dict' = None,
defaults: 'dict' = None,
background: bool = False) -> bool
Run schema change task for stream.
Arguments:
schema (SchemaDef or str(json)): New stream schema;
mappings
dict - column mappings in format 'TypeName:FiledName', for example: { 'deltix.timebase.api.messages.TradeMessage:size' : 'deltix.timebase.api.messages.TradeMessage:newSizeField' };
defaults
dict - default values for not nullable columns. For example, { 'deltix.timebase.api.messages.TradeMessage:newSizeField' : '1.23' };
background
bool - true to run task in background mode, in this case method returns right after rest query to server. You can track status of background task with backgroundProcessInfo() method. You can abort task with abortBackgroundProcess() method.
backgroundProcessInfo
def backgroundProcessInfo() -> 'BackgroundProcessInfo'
Gets stream background process information.
abortBackgroundProcess
def abortBackgroundProcess() -> None
Aborts active background process if any exists
listEntities
def listEntities(
instrumentTypes: 'list[InstrumentType]' = None
) -> 'list[InstrumentIdentity]'
Return an inclusive range of times for which the specified entities have data in the database.
Arguments:
instrumentTypes
list[InstrumentType] - instrument types for which entities should be selected. If None, all entities of stream will be selected.Returns:
list[InstrumentIdentity]
- selected entities.
truncate
def truncate(timestamp: int,
entities: 'list[InstrumentIdentity]' = None) -> bool
Truncates stream data for the given entities from given time
Arguments:
timestamp
int - Timestamp in millis. If time less than stream start time, then all stream data will be deleted.entities
list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.Returns:
bool
- true, if stream was truncated successfully.
clear
def clear(entities: 'list[InstrumentIdentity]' = None) -> bool
Clear stream data for the given entities.
Arguments:
entities
list[InstrumentIdentity] - A list of entities. If None, all stream entities will be used.
purge
def purge(timestamp: int) -> bool
Deletes stream data that is older than a specified time
Arguments:
timestamp (int):Purge time in milliseconds.
Returns:
bool
- true, if stream was purged successfully.
deleteStream
def deleteStream() -> bool
Deletes this stream
Returns:
bool
- true, if stream was deleted successfully.
abortBackgroundProcess
def abortBackgroundProcess() -> bool
Aborts active background process if any exists
lock
def lock(options: 'LockType or LockOptions' = None) -> None
Acquire a lock of this stream. Default lock type is WRITE.
tryLock
def tryLock(options: 'LockType or LockOptions', timeout: 'int') -> None
Blocking operation that attempts to obtain given type of lock on this stream. If lock cannot be obtained during specified timeout operation fails with exception.
unlock
def unlock() -> None
Releases stream lock.
select
def select(timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity]') -> 'TickCursor'
Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp.
Arguments:
timestamp
int - The start timestamp in millis.options
SelectionOptions - Selection options.types
list[str] - Specified message types to be subscribed. If null, then all types will be subscribed.entities
list[InstrumentIdentity] - Specified entities to be subscribed. If null, then all entities will be subscribed.Returns:
TickCursor
- A cursor used to read messages.
trySelect
@contextmanager
def trySelect(timestamp: int, options: SelectionOptions, types: 'list[str]',
entities: 'list[InstrumentIdentity]') -> 'TickCursor'
Contextmanager version of select. Usage:
with stream.trySelect(timestamp, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
createCursor
def createCursor(options: SelectionOptions) -> 'TickCursor'
Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options)
Arguments:
options
SelectionOptions - Selection Options.Returns:
A cursor used to read messages. Never null.
tryCursor
@contextmanager
def tryCursor(options: SelectionOptions) -> 'TickCursor'
contextmanager version of createCursor. Usage:
with stream.tryCursor(options) as cursor:
while cursor.next():
message = cursor.getMessage()
createLoader
def createLoader(options: LoadingOptions) -> 'TickLoader'
Creates a channel for loading data. The loader must be closed when the loading process is finished.
Arguments:
options
SelectionOptions - Loading Options.Returns:
TickLoader
- created loader.
tryLoader
@contextmanager
def tryLoader(options: LoadingOptions) -> 'TickLoader'
Contextmanager version of createLoader. Usage:
with stream.tryLoader(options) as loader:
loader.send(message)
listSpaces
def listSpaces() -> 'list[str]'
Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned.
renameSpace
def renameSpace(newName: str, oldName: str) -> None
Rename existing space.
Arguments:
nameName
str - space to rename.oldName
str - new space name.
deleteSpaces
def deleteSpaces(spaces: 'list[str]') -> None
Removed given 'spaces' permanently.
Arguments:
spaces
list[str] - list of spaces names to delete.
getTimeRange
def getTimeRange(entities: 'list[InstrumentIdentity]' = None) -> 'list[int]'
Return an inclusive range of times for which the specified entities have data in the database.
Arguments:
entities
list[InstrumentIdentity] - A list of entities. If empty, return for all.
getSpaceTimeRange
def getSpaceTimeRange(space: str) -> 'list[int]'
An array consisting of two long timestamps (from and to) or None if no data was found.
Arguments:
space
str - space name.
TickCursor
class TickCursor(object)
A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp.
To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set.
Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset:
```
options = dxapi.SelectionOptions()
cursor = tickdb.createCursor(stream, options)
cursor.subscribeToAllEntities()
cursor.subscribeToAllTypes()
cursor.reset(timestamp)
```
next
def next() -> bool
Moves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true.
Returns:
bool
- false if at the end of the cursor.
getMessage
def getMessage() -> 'InstrumentMessage'
Returns an InstrumentMessage object cursor points at.
isAtEnd
def isAtEnd() -> bool
Returns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle.
nextIfAvailable
def nextIfAvailable() -> int
Moves cursor on to the next message, but this method NOT blocks until the next message becomes available.
Returns:
NextResult
- OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2)
isClosed
def isClosed() -> bool
Returns true, if cursor was closed
close
def close() -> None
Close the cursor
getCurrentStreamKey
def getCurrentStreamKey() -> str
Return the key of the stream that is the source of the current message.
reset
def reset(timestamp: int, entities: 'list[InstrumentIdentity]' = None) -> None
Reposition the message source to a new point in time, while preserving current subscription.
Arguments:
timestamp
int - The new position in time in millis.entities
'list[InstrumentIdentity]' - list of entities to reset
subscribeToAllEntities
def subscribeToAllEntities() -> None
Subscribe to all available entities.
clearAllEntities
def clearAllEntities() -> None
Switch to selective subscription mode (if necessary) and clear the list.
addEntity
def addEntity(entity: InstrumentIdentity) -> None
Add the specified entity to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, 'DAV 100417P00085000'));
While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity(dxapi.InstrumentIdentity(dxapi.InstrumentType.OPTION, "DAV "));
addEntities
def addEntities(entities: 'list[InstrumentIdentity]') -> None
Bulk add the specified entities to subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
removeEntities
def removeEntities(entities: 'list[InstrumentIdentity]') -> None
Remove the specified entities from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
removeEntity
def removeEntity(entity: InstrumentIdentity) -> None
Remove the specified entity from subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
subscribeToAllTypes
def subscribeToAllTypes() -> None
Subscribe to all available types (no filtering).
addTypes
def addTypes(types: 'list[str]') -> None
Add the specified type names to subscription.
removeTypes
def removeTypes(types: 'list[str]') -> None
Remove the specified types from subscription.
setTypes
def setTypes(types: 'list[str]') -> None
Subscribe to specified types.
add
def add(types: 'list[str]', entities: 'list[InstrumentIdentity]') -> None
Add the specified entities and types to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call.
Arguments:
types
list[str] - not-null array of type names to subscribe.entities
list[InstrumentIdentity] - not-null array of instruments to subscribe.
remove
def remove(types: 'list[str]', entities: 'list[InstrumentIdentity]') -> None
Remove the specified entities and types from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call.
Arguments:
types
list[str] - not-null array of type names to unsubscribe.entities
list[InstrumentIdentity] - not-null array of instruments to unsubscribe.
addStreams
def addStreams(streams: 'list[TickStream]') -> None
Add streams to subscription. Current time and filter is used to query data from new sources.
Arguments:
streams
'list[TickStream]' - Streams to add.
removeStreams
def removeStreams(streams: 'list[TickStream]') -> None
Remove streams from subscription.
Arguments:
streams
list[TickStream] - Streams to remove.
removeAllStreams
def removeAllStreams() -> None
Remove all streams from subscription.
setTimeForNewSubscriptions
def setTimeForNewSubscriptions(timestamp: int) -> None
This method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time.
Arguments:
timestamp
int - The time to use.
TickLoader
class TickLoader(object)
Object which consumes messages.
Create loader from TickDb: options = dxapi.LoadingOptions() stream = tickdb.createLoader(stream, options)
Create loader from TickStream: options = dxapi.LoadingOptions() stream = stream.createLoader(options)
send
def send(message: InstrumentIdentity) -> None
This method is invoked to send a message to the object.
Arguments:
message
InstrumentIdentity - A temporary buffer with the message. By convention, the message is only valid for the duration of this call.
flush
def flush() -> None
Flushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.
close
def close() -> None
Flushes and closes the loader
addListener
def addListener(listener: 'ErrorListener') -> None
Register error listener. All writing data errors will be delivered to the listener.
Arguments:
listener
ErrorListener - error listener to register.
removeListener
def removeListener(listener: 'ErrorListener') -> None
Unsubscribe registered error listener.
Arguments:
listener
ErrorListener - error listener to unsubscribe.
nErrorListeners
def nErrorListeners() -> int
Returns number of registered error listeners
registerType
def registerType(type: str) -> int
Register type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example:
message = dxapi.InstrumentMessage()
message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader")
// as alternative, you could write:
// message.typeName = "deltix.timebase.api.messages.universal.PackageHeader"
loader.send(message)Arguments:
type
str - name of type to register.Returns:
int
- id of registered type.
registerInstrument
def registerInstrument(type: InstrumentType, symbol: str) -> int
Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:
message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrument(dxapi.InstrumentType.EQUITY, 'AAPL')
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)Arguments:
type
InstrumentType - type of instrument.symbol
str - instrument ticker.Returns:
int
- id of registered instrument.
registerInstrumentIdentity
def registerInstrumentIdentity(instrument: InstrumentIdentity) -> int
Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example:
message = dxapi.InstrumentMessage()
message.instrumentId = loader.registerInstrumentIdentity(dxapi.InstrumentIdentity(dxapi.InstrumentType.EQUITY, 'AAPL'))
// as alternative, you could write:
// message.instrumentType = 'EQUITY'
// message.symbol = 'AAPL'
loader.send(message)Arguments:
type
InstrumentType - type of instrument.symbol
str - instrument ticker.Returns:
int
- id of registered instrument.
ErrorListener
class ErrorListener(object)
Listener for errors related to loading/sending data into a TickLoader.
Usage:
```
class MyErrorListener(dxapi.ErrorListener):
def onError(self, errorClass, errorMsg):
print('Error: ' + errorMsg)
listener = MyErrorListener() # save listener
loader.addListener(listener)
loader.send()
...
loader.removeListener(listener)
loader.close()
```
SchemaDef
class SchemaDef()
TimeBase stream schema definition class.
Example:
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')Properties:
all
list[TypeDef] - all types of stream schema (including content, non-content classes and enums).
types
list[TypeDef] - content (top level) types of stream.
TypeDef
class TypeDef()
TimeBase stream type definition class.
Example:
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')Properties:
name
str - name of type.
title
str - title of type.
fields
list[FieldDef] - list of type fields.
isEnum
bool - true, if type is enum.
parent
str - name of parent type.
isAbstract
bool - true, if type if abstract.
FieldDef
class FieldDef()
TimeBase type field definition class.
Example:
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')Properties:
name
str - name of field.
title
str - title of field.
description
str - description of field.
type
DataTypeDef - type of field.
static
bool - true, if field is static.
value
str - static value of field.
relativeTo
str - field name (used to specify that field decoding depends on another field value).
primaryKey
bool - true, if field is a part of the message's primary key.
hide
bool - true, if field is hidden.
DataTypeDef
class DataTypeDef()
TimeBase field type definition class.
Example:
schema = stream.metadata()
tradeType = schema.findType('TradeMessage')
priceField = tradeType.findField('price')
priceField.type.encoding = 'IEEE64'Properties:
name
str - name of field type.
encoding
str - encoding of field type.
nullable
bool - true for nullable field types.
types
list[str] - list of schema type names for polymorphic types.
elementType
str - type name of array element data type.