Skip to main content

S3/Athena

Amazon Simple Storage Service (Amazon S3) is a scalable, high-speed, web-based cloud storage service designed for online backup, data archiving, and applications on Amazon Web Services. Once you setup data streaming into S3, you can use serverless services like Athena and QuickSight to run data analytics.

This document describes how to move data between TimeBase and S3.

Configuration

Create S3 Bucket using AWS Management Console as described in AWS S3 Guide. You will need a bucket name and a region for Execution Server configuration. We recommend creating a separate AWS user timebase-s3-loader to be used by Execution Server to load data into S3 bucket. This user should only be granted read and write permissions for this bucket. See Appendix for example.

  1. Log in to AWS Management Console.
  2. Click on the username at the top.
  3. Enter My Security Credentials.
  4. Under Access keys for CLI, SDK, & API access click Create Access key and note API access key and secret key generated by AWS.

Streaming Data to S3

TimeBase bin directory includes s3replica script that streams data from TimeBase stream into AWS S3 bucket. This utility can run in two modes:

  • Live mode - special daemon service exports data in near real-time.
  • Batch mode - periodic process exports all recently accumulated data in a batch mode.

The main difference between these two modes is that in Batch mode replication tool stops at the end of the stream, white in Live mode replication tool does not stop and expects more data to come in. In live or batch more if you stop the tool and then restart it, it will resume streaming data from the point it was interrupted. In addition to storing data, this utility can also control source data retention. You can automatically purge data that is older than a given number of days from the source TimeBase stream. Note, that this purge would happen only if s3replica has been successfully backed up into S3.

caution

Purging is available in Live mode ONLY.

List of command line arguments accepted by s3replica tool:

  • streams <stream1,..,streamN> - Comma-separated list of streams. Required.
  • timebase <timebase_url> - TimeBase URL. By default: dxtick://localhost:8011
  • live - If defined stream will be read in live mode instead of default batch mode.
  • retain <num_of_days> - Number of days TimeBase data should be retained after replication. When negative or not specified, TimeBase data will not be purged.
  • bucket <bucket> - AWS bucket name where stream data will be uploaded. Required.
  • region <region> - AWS bucket region. Required.
  • accessKeyId <access_key_id> - AWS access key ID. Can be specified in environment variable: AWS_ACCESS_KEY_ID - preferred option security-wise
  • accessKey <access_key> - AWS secret access key. Can be specified in environment variable: AWS_SECRET_ACCESS_KEY - preferred option security-wise
  • maxBatchSize <num_of_records> - Max number of records in uploaded batch. Use 0 for no limit. Default 100000.
  • maxBatchTime <period_in_ms> - Max time period in milliseconds spanned by records in uploaded batch. Use 0 for no limit. Default 15 min.

Example

For this example we will use TimeBase stream "balances" that stores user account balances on crypto exchanges. You can use any other stream that you have in your TimeBase. Just make sure the stream is not empty if you want to see any data appear in S3.

DURABLE STREAM "balances" (
CLASS "deltix.samples.uhf.balance.BalanceMessage" (
"account" VARCHAR,
"amount" FLOAT,
"destination" VARCHAR
);
)
OPTIONS (FIXEDTYPE; PERIODICITY = 'IRREGULAR'; DF = 1; HIGHAVAILABILITY = FALSE)

Let's set AWS access credentials:

export AWS_ACCESS_KEY_ID=AKIA2*******Q
export AWS_SECRET_ACCESS_KEY=2lm8BJ6nZVN7****qKmYdJ+q/DwYUvx2YZ

Run s3replica tool to import data to S3:

s3replica -streams balances -bucket timebase-s3-test -region us-east-2 
7 Apr 23:31:21.086 WARN [main] "balances" stream does not support spaces
7 Apr 23:31:21.555 INFO [main] Starting replicator for balances
7 Apr 23:31:21.898 INFO [Thread-2] Uploaded 1728 records in 0.183 sec (9442 records per sec) to balances/date=2020-01-07/20-01-12_1578427272543.json.gz

7 Apr 23:31:22.176 INFO [Thread-2] Uploaded 4320 records in 0.278 sec (15539 records per sec) to balances/date=2020-01-07/20-16-21_1578428181351.json.gz
7 Apr 23:31:22.306 INFO [Thread-2] Uploaded 4344 records in 0.129 sec (33674 records per sec) to balances/date=2020-01-07/20-31-26_1578429086468.json.gz
7 Apr 23:31:22.444 INFO [Thread-2] Uploaded 4344 records in 0.137 sec (31708 records per sec) to balances/date=2020-01-07/20-46-31_1578429991581.json.gz
7 Apr 23:31:22.591 INFO [Thread-2] Uploaded 4344 records in 0.147 sec (29551 records per sec) to balances/date=2020-01-07/21-01-36_1578430896690.json.gz
7 Apr 23:31:22.839 INFO [Thread-2] Uploaded 4344 records in 0.248 sec (17516 records per sec) to balances/date=2020-01-07/21-16-41_1578431801801.json.gz
7 Apr 23:31:23.006 INFO [Thread-2] Uploaded 4344 records in 0.167 sec (26011 records per sec) to balances/date=2020-01-07/21-31-46_1578432706910.json.gz
7 Apr 23:31:23.122 INFO [Thread-2] Uploaded 4344 records in 0.116 sec (37448 records per sec) to balances/date=2020-01-07/21-46-52_1578433612224.json.gz
7 Apr 23:31:23.256 INFO [Thread-2] Uploaded 4344 records in 0.134 sec (32417 records per sec) to balances/date=2020-01-07/22-01-57_1578434517346.json.gz
7 Apr 23:31:23.374 INFO [Thread-2] Uploaded 4344 records in 0.118 sec (36813 records per sec) to balances/date=2020-01-07/22-17-02_1578435422456.json.gz
7 Apr 23:31:23.512 INFO [Thread-2] Uploaded 4344 records in 0.138 sec (31478 records per sec) to balances/date=2020-01-07/22-32-07_1578436327549.json.gz
7 Apr 23:31:23.621 INFO [Thread-2] Uploaded 4344 records in 0.109 sec (39853 records per sec) to balances/date=2020-01-07/22-47-12_1578437232675.json.gz
7 Apr 23:31:23.714 INFO [Thread-2] Uploaded 1320 records in 0.093 sec (14193 records per sec) to balances/date=2020-01-07/22-51-47_1578437507708.json.gz
7 Apr 23:31:23.884 INFO [Thread-2] Uploaded 4344 records in 0.17 sec (25552 records per sec) to balances/date=2020-03-03/17-50-05_1583257805176.json.gz
7 Apr 23:31:24.022 INFO [Thread-2] Uploaded 2616 records in 0.137 sec (19094 records per sec) to balances/date=2020-03-03/17-59-10_1583258350252.json.gz
7 Apr 23:31:24.022 INFO [Thread-2] Successfully uploaded 57768 "balances" messages to S3

The following content has been added to timebase-s3-test bucket:

  • Folder balances (which corresponds to our stream name)

  • Sub-folders like date=2020-01-07 which chronologically group messages. This will be used to partition our data in Athena.

  • Each subfolder contains several GZIP-compressed JSON files:

  • If we look inside any of these files, we will find JSON data:

    {
    "type": "deltix.samples.uhf.balance.BalanceMessage",
    "symbol": "BTC",
    "timestamp": "2020-01-07T19:45:33.063Z",
    "amount": "13122.0",
    "destination": "COINBASE"
    }
  • For Java Developers: TimeBase 5.X messages use JSON text format in their toString() and parse().

Controlling Batch Sizes

S3Replica utility uploads data records in batches as son.gz files. How much data and for how long it will be collected before it is uploaded to S3 in a batch can be controlled with optional -maxBatchSize and -maxBatchTime command line arguments. Where maxBatchSize limits the maximum number or records included and by default is set to 100,000, and maxBatchTime limits the time period in milliseconds spanned by the batch records. It is set to 15 min by default. Either limit can be disabled by setting it to 0.

Using TimeBase Spaces

If a replicated TimeBase stream supports spaces, objects stored in S3 will be partitioned by their TimeBase space and their S3 keys will include space=<space_id> folder: balances/space=1/date=2020-01-07/22-51-47_1578437507708.json.gz. The default space uses empty string as it's space ID. So S3 keys for the default space will look like this: balances/space=/date=2020-01-07/22-51-47_1578437507708.json.gz. Partitioning by space is done to facilitate importing S3 data back into TimeBase. File name consists of hh-mm-ss_<sequence_id> of the last record in the relevant object.

Athena Queries

AWS provides many ways to access S3 data. S3 Select and Athena are the most obvious choices. Here will describe how to access data using Amazon Athena. Amazon Athena is a service that enables a data analyst to perform interactive queries in the Amazon Web Services public cloud on data stored in Amazon S3. To run Athena queries on the messages and orders data you exported to S3 follow the steps in Athena User Guide to create your database and tables. Copy these queries to Athena Query editor tab, update the database name at the top and the bucket name in S3 URL at the bottom to match your database and bucket and run the queries to create your tables.

CREATE DATABASE deltixdb

/* The statement to create balances table:*/

CREATE EXTERNAL TABLE IF NOT EXISTS deltixdb.balances (
`type` string,
`symbol` string,
`timestamp` timestamp,
`amount` decimal,
`destination` string
) PARTITIONED BY (
`date` date
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://ember-s3-test/balances/'
TBLPROPERTIES ('has_encrypted_data'='false');

Partitions

Please note how date column is defined. This column is mapped to S3 bucket directories (e.g “date=2020-01-07”). In order to make partitions defined by date recognized by Athena, we need to execute the following query:

MSCK REPAIR TABLE balances;

The result should output the following in Athena. Notice how it confirms every discovered partition:

Partitions not in metastore: balances:date=2020-01-07 balances:date=2020-03-03
Repair: Added partition to metastore balances:date=2020-01-07
Repair: Added partition to metastore balances:date=2020-03-03
tip

Every time a new partition is added, it is necessary to re-run the above REPAIR query to make it available for Athena. You can run show partitions <table-name> query to confirm loaded partitions.

Since Athena charges are based on S3 traffic, it is recommended to use partition keys in your queries. See examples below.

Simple Test

The following query returns all records discovered by Athena in all your partitions:

select count(*) from balances 

The following query returns records for last 7 days:

SELECT count(*) 
FROM balances
WHERE date >= current_date - interval '7' day;

This query will show what kind of order each source sends to KRAKEN exchange:

select distinct account, destination 
from orders
where ExchangeId = 'KRAKEN'

Show last 100 orders with their submission timestamp in America/New_York timezone:

select SourceId, DestinationId, OrderId, OpenTime AT TIME ZONE 'America/New_York' as OpenTime from orders
order by OpenTime desc
limit 100
Show last 100 trades:
select *
from messages
where Type ='OrderTradeReportEvent'
order by timestamp desc
limit 100

Athena Views

CREATE OR REPLACE VIEW orders_view AS
SELECT *, averagePrice*cumulativeQuantity as totalValue
FROM orders
WHERE cumulativeQuantity > 0;

Athena Views can be useful in data analytics. For example, for charting "enriched" orders in Amazon Quicksight.

Performance

The following performance was observed on standard S3 storage class (fastest) using a compressed JSON format.

Upload Performance

Stress test was used to generate a realistic sample of trading activity. S3 data warehouse pipeline peaked at around 15000 orders/second (which also produced around 45000 trading messages/second).

Query Performance

Test dataset:

  • 10 Gb of compressed JSON in orders (roughly 300 million orders)
  • 30 Gb of compressed JSON in messages (roughly 850 million messages)

Querying a typical table with orders took 2 minutes 15 seconds, while querying a table with messages took about 11 minutes. Partitioning data into sub-folders according to typical use cases may drastically improve these query times.

Restoring Data to TimeBase

Example
s3import.cmd -stream GDAX -timebase dxtick://localhost:8192 -tf yyyy-MM-dd -startTime 2020-02-17 -bucket gdax-replica-42 -region eu-west-3 -importMode APPEND

TimeBase bin directory includes s3import script that can restore data from S3 back into TimeBase stream.

This tool has three modes:

  1. INSERT - all data will be written to stream.
  2. APPEND - for each symbol only data that appears after the last timestamp will be written.
  3. REPLACE (this is the default mode):
    • Loader will delete data from the given stream with the given time range. If the time range is not specified, then it will use data range from S3 metadata.
    • All loaders will be created with TimeBase WriteMode.INSERT option.
info

Refer to Writing Mode for details.

The following arguments can be defined by users:

  • stream <stream_key> - stream with according schema, where data will be imported. Required.
  • timebase <timebase_url> - TimeBase URL. By default: dxtick://localhost:8011.
  • tf <time_format> - Time format (Java-style). Example: "yyyy.MM.dd'T'HH:mm:ss z". By default: "yyyy-MM-dd".
  • startTime <start_time> - Start time in date-time format according to -tf argument. By default Long.MIN_VALUE.
  • endTime <end_time> - End time in date-time format according to -tf argument. By default Long.MAX_VALUE.
  • importMode <import_mode> - INSERT|APPEND|REPLACE. By default REPLACE.
  • bucket - AWS bucket name where stream data is stored. Required.
  • region - AWS bucket region. Required.
  • accessKeyId <access_key_id> - AWS access key ID. Can be specified in environment variable: AWS_ACCESS_KEY_ID.
  • accessKey <access_key> - AWS secret access key. Can be specified in environment variable: AWS_SECRET_ACCESS_KEY.

IAM Permissions

The following IAM permission can be given to IAM user (if you use API keys) or role to access S3 bucket 'timebase-s3-test':

{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource":"arn:aws:s3:::timebase-s3-test"
},
{
"Effect":"Allow",
"Action":[
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:GetObjectAcl",
"s3:DeleteObject"
],
"Resource":"arn:aws:s3:::timebase-s3-test/*"
}
]
}