Skip to main content

S3/Athena

Amazon Simple Storage Service (Amazon S3) is a scalable, high-speed, web-based cloud storage service designed for online backup, data archiving, and applications on Amazon Web Services. Once you setup data streaming into S3, you can use serverless services like Athena and QuickSight to run data analytics.

This document describes how to move data between TimeBase and S3.

Configuration

Create S3 Bucket using AWS Management Console as described in AWS S3 Guide. You will need a bucket name and a region for Execution Server configuration. We recommend creating a separate AWS user timebase-s3-loader to be used by Execution Server to load data into S3 bucket. This user should only be granted read and write permissions for this bucket. See Appendix for example.

  1. Log in to AWS Management Console.
  2. Click on the username at the top.
  3. Enter My Security Credentials.
  4. Under Access keys for CLI, SDK, & API access click Create Access key and note API access key and secret key generated by AWS.

Streaming Data to S3

TimeBase bin directory includes s3replica script that streams data from TimeBase stream into AWS S3 bucket. This utility can run in two modes:

  • Live mode - special daemon service exports data in near real-time.
  • Batch mode - periodic process exports all recently accumulated data in a batch mode.

The main difference between these two modes is that in Batch mode replication tool stops at the end of the stream, white in Live mode replication tool does not stop and expects more data to come in.
In live or batch mode if you stop the tool and then restart it, it will resume streaming data from the point it was interrupted. In addition to storing data, this utility can also control source data retention. You can automatically purge data that is older than a given number of days from the source TimeBase stream. Note, that this purge would happen only if s3replica has been successfully backed up into S3.

caution

Purging is available in Live mode ONLY.

List of command line arguments accepted by s3replica tool:

  • streams <stream1,..,streamN> - Comma-separated list of streams. Required.
  • timebase <timebase_url> - TimeBase URL. By default: dxtick://localhost:8011
  • live - If defined stream will be read in live mode instead of default batch mode.
  • retain <num_of_days> - Number of days TimeBase data should be retained after replication. When negative or not specified, TimeBase data will not be purged.
  • bucket <bucket> - AWS bucket name where stream data will be uploaded. Required.
  • region <region> - AWS bucket region. Required.
  • accessKeyId <access_key_id> - AWS access key ID. Can be specified in environment variable: AWS_ACCESS_KEY_ID - preferred option security-wise
  • accessKey <access_key> - AWS secret access key. Can be specified in environment variable: AWS_SECRET_ACCESS_KEY - preferred option security-wise
  • maxBatchSize <num_of_records> - Max number of records in uploaded batch. Use 0 for no limit. Default 100000.
  • maxBatchTime <period_in_ms> - Max time period in milliseconds spanned by records in uploaded batch. Use 0 for no limit. Default 15 min.

Example

For this example we will use TimeBase stream "balances" that stores user account balances on crypto exchanges. You can use any other stream that you have in your TimeBase. Just make sure the stream is not empty if you want to see any data appear in S3.

DURABLE STREAM "balances" (
CLASS "deltix.samples.uhf.balance.BalanceMessage" (
"account" VARCHAR,
"amount" FLOAT,
"destination" VARCHAR
);
)
OPTIONS (FIXEDTYPE; PERIODICITY = 'IRREGULAR'; DF = 1; HIGHAVAILABILITY = FALSE)

Let's set AWS access credentials:

export AWS_ACCESS_KEY_ID=AKIA2*******Q
export AWS_SECRET_ACCESS_KEY=2lm8BJ6nZVN7****qKmYdJ+q/DwYUvx2YZ

Run s3replica tool to import data to S3:

s3replica -streams balances -bucket timebase-s3-test -region us-east-2 
7 Apr 23:31:21.086 WARN [main] "balances" stream does not support spaces
7 Apr 23:31:21.555 INFO [main] Starting replicator for balances
7 Apr 23:31:21.898 INFO [Thread-2] Uploaded 1728 records in 0.183 sec (9442 records per sec) to balances/date=2020-01-07/20-01-12_1578427272543.json.gz

7 Apr 23:31:22.176 INFO [Thread-2] Uploaded 4320 records in 0.278 sec (15539 records per sec) to balances/date=2020-01-07/20-16-21_1578428181351.json.gz
7 Apr 23:31:22.306 INFO [Thread-2] Uploaded 4344 records in 0.129 sec (33674 records per sec) to balances/date=2020-01-07/20-31-26_1578429086468.json.gz
7 Apr 23:31:22.444 INFO [Thread-2] Uploaded 4344 records in 0.137 sec (31708 records per sec) to balances/date=2020-01-07/20-46-31_1578429991581.json.gz
7 Apr 23:31:22.591 INFO [Thread-2] Uploaded 4344 records in 0.147 sec (29551 records per sec) to balances/date=2020-01-07/21-01-36_1578430896690.json.gz
7 Apr 23:31:22.839 INFO [Thread-2] Uploaded 4344 records in 0.248 sec (17516 records per sec) to balances/date=2020-01-07/21-16-41_1578431801801.json.gz
7 Apr 23:31:23.006 INFO [Thread-2] Uploaded 4344 records in 0.167 sec (26011 records per sec) to balances/date=2020-01-07/21-31-46_1578432706910.json.gz
7 Apr 23:31:23.122 INFO [Thread-2] Uploaded 4344 records in 0.116 sec (37448 records per sec) to balances/date=2020-01-07/21-46-52_1578433612224.json.gz
7 Apr 23:31:23.256 INFO [Thread-2] Uploaded 4344 records in 0.134 sec (32417 records per sec) to balances/date=2020-01-07/22-01-57_1578434517346.json.gz
7 Apr 23:31:23.374 INFO [Thread-2] Uploaded 4344 records in 0.118 sec (36813 records per sec) to balances/date=2020-01-07/22-17-02_1578435422456.json.gz
7 Apr 23:31:23.512 INFO [Thread-2] Uploaded 4344 records in 0.138 sec (31478 records per sec) to balances/date=2020-01-07/22-32-07_1578436327549.json.gz
7 Apr 23:31:23.621 INFO [Thread-2] Uploaded 4344 records in 0.109 sec (39853 records per sec) to balances/date=2020-01-07/22-47-12_1578437232675.json.gz
7 Apr 23:31:23.714 INFO [Thread-2] Uploaded 1320 records in 0.093 sec (14193 records per sec) to balances/date=2020-01-07/22-51-47_1578437507708.json.gz
7 Apr 23:31:23.884 INFO [Thread-2] Uploaded 4344 records in 0.17 sec (25552 records per sec) to balances/date=2020-03-03/17-50-05_1583257805176.json.gz
7 Apr 23:31:24.022 INFO [Thread-2] Uploaded 2616 records in 0.137 sec (19094 records per sec) to balances/date=2020-03-03/17-59-10_1583258350252.json.gz
7 Apr 23:31:24.022 INFO [Thread-2] Successfully uploaded 57768 "balances" messages to S3

The following content has been added to timebase-s3-test bucket:

  • Folder balances (which corresponds to our stream name)

  • Sub-folders like date=2020-01-07 which chronologically group messages. This will be used to partition our data in Athena.

  • Each subfolder contains several GZIP-compressed JSON files:

  • If we look inside any of these files, we will find JSON data:

    {
    "type": "deltix.samples.uhf.balance.BalanceMessage",
    "symbol": "BTC",
    "timestamp": "2020-01-07T19:45:33.063Z",
    "amount": "13122.0",
    "destination": "COINBASE"
    }
  • For Java Developers: TimeBase 5.X messages use JSON text format in their toString() and parse().

Controlling Batch Sizes

S3Replica utility uploads data records in batches as json.gz files. How much data and for how long it will be collected before it is uploaded to S3 in a batch can be controlled with optional -maxBatchSize and -maxBatchTime command line arguments. Where maxBatchSize limits the maximum number or records included and by default is set to 100,000, and maxBatchTime limits the time period in milliseconds spanned by the batch records. It is set to 15 min by default. Either limit can be disabled by setting it to 0.

Using TimeBase Spaces

If a replicated TimeBase stream supports spaces, objects stored in S3 will be partitioned by their TimeBase space and their S3 keys will include space=<space_id> folder: balances/space=1/date=2020-01-07/22-51-47_1578437507708.json.gz. The default space uses empty string as it's space ID. So S3 keys for the default space will look like this: balances/space=/date=2020-01-07/22-51-47_1578437507708.json.gz. Partitioning by space is done to facilitate importing S3 data back into TimeBase. File name consists of hh-mm-ss_<sequence_id> of the last record in the relevant object.

Athena Queries

AWS provides many ways to access S3 data. S3 Select and Athena are the most obvious choices. Here will describe how to access data using Amazon Athena. Amazon Athena is a service that enables a data analyst to perform interactive queries in the Amazon Web Services public cloud on data stored in Amazon S3. To run Athena queries on the messages and orders data you exported to S3 follow the steps in Athena User Guide to create your database and tables. Copy these queries to Athena Query editor tab, update the database name at the top and the bucket name in S3 URL at the bottom to match your database and bucket and run the queries to create your tables.

CREATE DATABASE deltixdb

/* The statement to create balances table:*/

CREATE EXTERNAL TABLE IF NOT EXISTS deltixdb.balances (
`type` string,
`symbol` string,
`timestamp` timestamp,
`amount` decimal,
`destination` string
) PARTITIONED BY (
`date` date
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://ember-s3-test/balances/'
TBLPROPERTIES ('has_encrypted_data'='false');

Partitions

Please note how date column is defined. This column is mapped to S3 bucket directories (e.g “date=2020-01-07”). In order to make partitions defined by date recognized by Athena, we need to execute the following query:

MSCK REPAIR TABLE balances;

The result should output the following in Athena. Notice how it confirms every discovered partition:

Partitions not in metastore: balances:date=2020-01-07 balances:date=2020-03-03
Repair: Added partition to metastore balances:date=2020-01-07
Repair: Added partition to metastore balances:date=2020-03-03
tip

Every time a new partition is added, it is necessary to re-run the above REPAIR query to make it available for Athena. You can run show partitions <table-name> query to confirm loaded partitions.

Since Athena charges are based on S3 traffic, it is recommended to use partition keys in your queries. See examples below.

Simple Test

The following query returns all records discovered by Athena in all your partitions:

select count(*) from balances 

The following query returns records for last 7 days:

SELECT count(*) 
FROM balances
WHERE date >= current_date - interval '7' day;

This query will show what kind of order each source sends to KRAKEN exchange:

select distinct account, destination 
from orders
where ExchangeId = 'KRAKEN'

Show last 100 orders with their submission timestamp in America/New_York timezone:

select SourceId, DestinationId, OrderId, OpenTime AT TIME ZONE 'America/New_York' as OpenTime from orders
order by OpenTime desc
limit 100
Show last 100 trades:
select *
from messages
where Type ='OrderTradeReportEvent'
order by timestamp desc
limit 100

Athena Views

CREATE OR REPLACE VIEW orders_view AS
SELECT *, averagePrice*cumulativeQuantity as totalValue
FROM orders
WHERE cumulativeQuantity > 0;

Athena Views can be useful in data analytics. For example, for charting "enriched" orders in Amazon Quicksight.

Restoring Data to TimeBase

TimeBase bin directory includes s3import script that can restore data from S3 back into TimeBase stream.

This tool has three modes:

  1. INSERT - all data will be written to stream.
  2. APPEND - for each symbol only data that appears after the last timestamp will be written.
  3. REPLACE (this is the default mode):
  • Loader will delete data from the given stream with the given time range. If the time range is not specified, then it will use data range from S3 metadata.
  • All loaders will be created with TimeBase WriteMode.INSERT option.
info

Refer to Writing Modes for details.

Argument Reference

Required

  • stream <stream_key> (must exist and schema must match exported data)
  • bucket <bucket_name>
  • region <aws_region | path> (S3 region OR filesystem path when -file is used)
  • timebase <url> (default dxtick://localhost:8011)
  • accessKeyId <access_key_id> AWS access key ID. Can be specified in environment variable: AWS_ACCESS_KEY_ID.
  • accessKey <access_key> AWS secret access key. Can be specified in environment variable: AWS_SECRET_ACCESS_KEY.

Time Range

  • tf <pattern> Java time pattern (default yyyy-MM-dd). Quote the pattern if it contains spaces or literal text.
  • startTime <value> (format per -tf; default Long.MIN_VALUE)
  • endTime <value> (format per -tf; default Long.MAX_VALUE)

Import Behavior

  • importMode <INSERT | APPEND | REPLACE> (default REPLACE)

Local Mode

  • file Enable filesystem mode; region becomes a path to the root mirror; bucket still used as top-level subdirectory.

Performance

  • channelBufferSize <bytes> Loader channel buffer (default 2097152; min 65536). Increase if loader threads show backpressure.
  • totalCores <n> Soft cap guiding concurrency across all spaces (default = availableProcessors / 2); note that in containers you may need to pass the cgroup-limited core count explicitly.
  • totalMemory <bytes> Soft cap for file prefetch & parse buffering per active space. Default = 75% of the JVM heap (-Xmx). Increase the overall budget primarily by raising -Xmx. Use this flag only to fine‑tune the portion of the heap devoted to prefetched and preparsed files.
  • downloadThreadsPerSpace <n> Forces n download threads per space, disables auto-tune for this stage (min 1).
  • parseThreadsPerSpace <n> Forces n parse workers per space, disables auto-tune for this stage (min 1).
  • disableParallelImport Disable per-space parallel import pipeline. Greatly lowers single-space throughput (multi-space concurrency still active). Ignored when set: totalMemory, downloadThreadsPerSpace, parseThreadsPerSpace. Use when importing many spaces simultaneously and CPU is saturated (reduce per-space resource usage / context switching).

Usage Examples

Basic

  1. Basic full restore (default REPLACE over entire exported range):
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-accessKeyId AKIA2*******Q -accessKey 2lm8BJ6nZVN7****qKmYdJ+q/DwYUvx2YZ

Further examples will omit the credentials, assuming they're set via the environment variables. For example:

export AWS_ACCESS_KEY_ID=AKIA2*******Q
export AWS_SECRET_ACCESS_KEY=2lm8BJ6nZVN7****qKmYdJ+q/DwYUvx2YZ
  1. Restore a specific date range (replace only August 2025):
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-startTime 2025-08-01 -endTime 2025-09-01
  1. Incremental append of newly exported day:
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-startTime 2025-08-24 -importMode APPEND
  1. Specific time:
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-tf "yyyy-MM-dd HH:mm" -startTime "2025-08-01 00:00" -endTime "2025-09-01 03:50"
  1. Filesystem mirror (mounted bucket; use path in -region and -file flag):
s3import.sh -file -stream orders -bucket orders-replica \
-region /mnt/s3_mirror -importMode APPEND

High-throughput

Ensure JVM heap (-Xmx) is sized appropriately (default totalMemory = 0.75 * heap; if you override -totalMemory, keep it well below -Xmx).

  1. Single large space, S3, rely on auto-tune (best first step):
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-startTime 2025-08-01 -endTime 2025-09-01
  1. Single large space, S3, explicit core/memory to steer auto-tune:

Rationale: when running in containers or VMs with nonstandard limits. Memory raises prefetch buffering; cores raise parallelism.

export JAVA_OPTS="-Xmx5583457485"
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-totalCores 16 -startTime 2025-08-01 -endTime 2025-09-01
  1. S3, multiple spaces (e.g., 40 spaces), fine tune memory limit:

Rationale: reduce per-space threads to avoid context switching and S3 throttling.

export JAVA_OPTS="-Xmx22333829940"
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-totalCores 64 -totalMemory 17179869184 \
-downloadThreadsPerSpace 4 -parseThreadsPerSpace 2
  1. Local filesystem mirror (mounted), lower download threads:

Rationale: local FS latency is low; fewer download threads suffice. Keep parsing high enough to saturate CPU.

export JAVA_OPTS="-Xmx11166914970"
s3import.sh -file -stream orders -bucket orders-replica \
-region /mnt/s3_mirror -totalCores 32 \
-downloadThreadsPerSpace 2
  1. High‑throughput plus larger loader buffer (only if loader shows backpressure):
export JAVA_OPTS="-Xmx11166914970"
s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-totalCores 32 -channelBufferSize 16777216
  1. Many small spaces, prioritize breadth over per‑space speed:

Rationale: drastically reduce per-space concurrency or disable per-space parallelism to minimize contention.

s3import.sh -stream orders -bucket orders-replica -region us-east-1 \
-disableParallelImport -totalCores 64
note

Note that the tool automatically limits concurrently active spaces to about totalCores / 2; adjust totalCores to bias this behavior when running under cgroup limits.

tip

For S3, place the importer in the same region as the bucket and a network-optimized instance family; higher downloadThreadsPerSpace helps hide S3 latency but is bounded by your cpu cores and memory.

Import Pipeline

This section explains how s3import achieves high throughput and how the tuning flags map to stages.

Overview

Data is imported per TimeBase space. Multiple spaces run concurrently, while each space uses a 3‑stage pipeline: Download -> Parse -> Import. Concurrency per stage is bounded by totalCores and per‑stage thread overrides. Order of messages within each space is preserved through the pipeline.

Concurrency Model

  • Active spaces: auto-limited to about totalCores / 2.
  • Within a space:
    • Download stage: 1..N threads (downloadThreadsPerSpace or auto).
    • Parse stage: 1..N threads (parseThreadsPerSpace or auto).
    • Import stage: exactly 1 thread.
  • disableParallelImport collapses all three stages into the single importer thread (no download/parse workers, lowest footprint).

Pipeline Stages per Space

  1. Download Stage

    • Workers fetch compressed JSON files from S3 or filesystem into the download queue.
    • Memory budget for in-flight files derives from 10% of totalMemory divided across active spaces.
  2. Parse Stage

    • Parser workers take the next file, decompress, parse messages, and push them into a per-file message queue.
    • The amount of uncompressed files derives from 70% of totalMemory divided across active spaces.
  3. Import Stage

    • Sequentially drains per-file queues, writing messages to the target stream.
    • When a queue EOF marker is seen, resources for that file are released and the next file begins.

Interactions and State Exchange

  • Downloaders place completed files into the download queue. When a file is ready, a parser worker picks it up.
  • Parsers read files, parse messages, and enqueue them into per-file message queues. When a file is fully parsed, an EOF marker is enqueued.
  • Importer thread consumes messages from the current file's queue. Upon EOF, it advances to the next queue.
  • State objects (SpaceDownloadState, SpaceState) coordinate resource usage, queue depths, and thread counts.
  • State Transitions (e.g., file EOF, queue exhaustion) trigger dynamic tuning of thread counts and queue capacities.

Auto-Tuner Details

The auto-tuner dynamically adjusts the number of download threads and queue sizes for each space during import. It monitors queue depths to optimize resource allocation:

  • Download threads per space are increased or decreased based on how full the download queue is, up to a maximum set by configuration or available CPU cores.
  • Queue capacities for both download and parse stages are recalculated as files are processed, using observed file sizes and the memory budget for each space.
  • Memory allocation: 10% of total memory is reserved for in-flight (compressed) downloads, and 70% for uncompressed files being parsed, divided among active spaces.
  • State transitions (like file completion or queue exhaustion) trigger re-evaluation of thread counts and queue sizes, ensuring efficient use of CPU and memory without manual tuning.

Performance Tuning Guidance

  1. Start with auto‑tune
  • Run with only stream/bucket/region. Optionally specify totalCores. Adjust memory primarily via -Xmx; override -totalMemory only for fine tuning (the default is 75% of the heap).
  1. Observe
  • CPU utilization: aim for 80–90% during steady state.
  • Downloaded files queue depth (per space): healthy range ~30–70% of its cap.
  • Parse queue depth (per space): steady small buffer is good; constant near-cap means parsing is outpacing load handoff.
  1. Tune in this order
  • Network/IO (downloadThreadsPerSpace):
    • Increase until download queue stays in the 30–70% range or S3 throttling/retries rise. For S3, values > parseThreads are normal; for file mode, keep smaller.
    • If the download queue is often >90% full, reduce downloadThreadsPerSpace or increase totalMemory. Prefer reducing threads first to avoid memory spikes.
  • CPU next (parseThreadsPerSpace):
    • Increase until CPU nears 85–90% with minimal run-to-run variance.
    • If parse queue is near 100% while download queue is ≤60%, reduce parseThreadsPerSpace (parsers are getting too far ahead).
  • Memory (Xmx, optional totalMemory):
    • Primary control is JVM heap size (-Xmx). Default totalMemory = 75% of heap.
    • Increase -Xmx if queues starve downloaders/parsers because the staging memory cap is too small.
    • Override -totalMemory only to change that fraction.
    • If heap usage regularly exceeds ~80%, first reduce per-space concurrency or lower a custom -totalMemory before shrinking heap.
    • When overriding, keep Xmx >= totalMemory * 1.3 for headroom.
  1. Many spaces in parallel
  • Prefer lowering per-space threads over raising totalCores.
  • If per-space throughput is still too low, consider running fewer spaces concurrently (reduce totalCores so the tool activates fewer spaces), or run multiple importers on separate hosts.
  1. Loader buffer
  • Only increase channelBufferSize if loader has backpressure. Start with 2–4x the default, verify improvement, and ensure heap headroom remains.
  1. Source-specific
  • S3:
    • Use instances in the same region.
    • Larger totalMemory helps hide S3 latency by allowing more files in memory.
  • Local filesystem:
    • Keep downloadThreadsPerSpace small and push parseThreadsPerSpace to saturate CPU.

Performance

The following performance was observed on standard S3 storage class (fastest) using a compressed JSON format.

Upload Performance

Stress test was used to generate a realistic sample of trading activity. S3 data warehouse pipeline peaked at around 15000 orders/second (which also produced around 45000 trading messages/second).

Query Performance

Test dataset:

  • 10 Gb of compressed JSON in orders (roughly 300 million orders)
  • 30 Gb of compressed JSON in messages (roughly 850 million messages)

Querying a typical table with orders took 2 minutes 15 seconds, while querying a table with messages took about 11 minutes. Partitioning data into sub-folders according to typical use cases may drastically improve these query times.

Import Performance

Hardware: 2.3 GHz Quad‑Core Intel Core i7, 32 GB 3733 MHz RAM.

  • With parallel pipeline enabled, importing 24,850,818 messages from a single space:
    • Local filesystem (upper bound): ~1 minute 22 seconds (≈311,000 messages/second) using 4 download threads and 6 parse threads with 1 GiB memory limit.
    • S3 (standard storage): ~1 minute 33 seconds (≈284,000 messages/second) using 8 download threads and 6 parse threads with 1 GiB memory limit.
  • Throughput scales with additional parallel workers: on S3, increasing download threads from 4 to 8 (with 6 parse threads) raised the average rate from ≈193,000 to ≈284,000 messages/second, with diminishing returns as CPU and network limits are approached.

IAM Permissions

The following IAM permission can be given to IAM user (if you use API keys) or role to access S3 bucket 'timebase-s3-test':

{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource":"arn:aws:s3:::timebase-s3-test"
},
{
"Effect":"Allow",
"Action":[
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:GetObjectAcl",
"s3:DeleteObject"
],
"Resource":"arn:aws:s3:::timebase-s3-test/*"
}
]
}