Skip to main content

Kafka

Overview

Kafka Connect is a framework for streaming data into or out of Apache Kafka®. This document explains how to use Kafka Connect connector for TimeBase. This service helps with importing data from TimeBase stream into Kafka topic (so-called Source Connector) or exporting data in reverse direction (so-called Sink Connector). Refer to Kafka resources to learn more about Kafka Connect technology.

Installation

Plugin is distributed in a Kafka Connect Docker image or as a jar file, for example: timebase-kafka-connector-0.7.1-all.jar. When installing plugin jar, add it to the classpath of Kafka Connect Worker (via worker plugin.path property). Connector can be executed in standalone or distributed mode (more about this later).

  • The Source Connector class is called deltix.kafka.connect.TBSourceConnector.
  • The Sink Connector class is called deltix.kafka.connect.TBSinkConnector.

Source Connector

TimeBase Source Connector will copy data from the specified TimeBase stream to a Kafka topic. It supports the following config properties:

  • timebase.url - TimeBase server URL.
  • timebase.user - TimeBase username.
  • timebase.pwd - TimeBase user password.
  • timebase.stream - Source stream name.
  • timebase.message.id - Optional name of INT64 field containing unique message ID. Used to keep track of the last recorded message when plugin is restarted. When omitted, we will use timestamp and per-timestamp message counter.
  • timebase.message.type - TimeBase message type name. Is used to select message type in polymorphic TimeBase streams. This parameter is only required when replicating polymorphic streams.
  • topic - Kafka target topic the data is copied to.
  • instrument.field - Optional Kafka topic field for storing TimeBase message Instrument Type.
  • symbol.field - Optional Kafka topic field for storing TimeBase message Symbol.
  • time.field - Optional Kafka topic field for storing TimeBase message Time.
  • key.fields - Optional comma-separated list of TimeBase message fields. Is used as a record key in Kafka Topics. The key string will be created by concatenating field values in a specified order with ‘_’ as a separator.
  • include.fields - Optional comma-separated list of stream fields. When specified, only these fields will be copied to Kafka.
  • exclude.fields - Optional comma-separated list of stream fields that should not be copied to Kafka.
  • rename.fields - Optional comma-separated list of top-level field names and their aliases (new names to be used in Kafka). The original name and alias must be separated by ‘:’. For instance, the property to rename business_time field to time, and business_operation field to operation will look like this: rename.fields=business_time:time,business_operation:operation.

Sink Connector

TimeBase Sink Connector will copy data from the specified Kafka topic to a TimeBase stream. It supports the following config properties:

  • timebase.url - TimeBase server URL.
  • timebase.user - TimeBase username.
  • timebase.pwd - TimeBase user password.
  • timebase.stream - Destination stream name.
  • timebase.message.id - Optional TimeBase message field for storing Kafka record offset.
  • timebase.message.key - Optional TimeBase message field for storing Kafka record key.
  • topics - Kafka topic from which data will be copied. Plugin supports copying from a single topic.
  • instrument.field - Optional Kafka topic field to copy to TimeBase message Instrument Type.
  • symbol.field - Optional Kafka topic field to copy to TimeBase message Symbol.
  • time.field - Optional Kafka topic INT64 field to copy to TimeBase message Time.
  • include.fields - Optional comma-separated list of top-level topic fields. When specified, only these fields will be copied to TimeBase.
  • exclude.fields - Optional comma-separated list of top-level topic fields that should not be copied to TimeBase.
  • timestamp.fields - Optional comma-separated list of INT64 fields to be interpreted as AVRO logical type timestamp-millis (TimeBase DataTimeDataType).
  • flatten.fields - Optional comma-separated list of top level Struct fields which attributes should be written as top level attributes to TimeBase. Field names will contain ‘.’ separated field name path. For instance, business_time field of datahub_operation_metadata structure field will appear at the top level as datahub_operation_metadata.business_time.
  • rename.fields - Optional comma-separated list of top-level field names and their aliases (new names to be used in TimeBase). The original name and alias must be separated by ‘:’. For instance, using example above, where we flattened datahub_operation_metadata, the business_time field can be renamed to bt TimeBase field with this rename.fields attribute: rename.fields=datahub_operation_metadata.business_time:bt.

Transformations

Kafka Connect framework supports Single Message Transformations (SMTs) that are applied to messages as they flow through the Connect. In the TimeBase to Kafka case, SMTs transform messages after the source connector has produced them, but before they are written to Kafka. In Kafka to TimeBase case, SMTs transform Kafka messages before they are sent to the Sink Connector.

Kafka Connect provides a number of SMTs. Some of the transformations overlap with field mapping features supported by the plugins. Transformations are specified in the same connector properties file. Multiple transformation can be chained together. See examples here for more information on how to use them.

Below, there is an example of transformation properties converting Avro logical date fields to strings before they are written to TimeBase by the Sink Connector. Without these transformations the fields hire_date and dismissal_date would be output as integers. In this case we chain two transformations (convert_hd and convert_dd) together:

transforms=convert_hd,convert_dd 

transforms.convert_hd.field=hire_date
transforms.convert_hd.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convert_hd.format=yyyy-MM-dd
transforms.convert_hd.target.type=string

transforms.convert_dd.field=dismissal_date
transforms.convert_dd.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convert_dd.format=yyyy-MM-dd
transforms.convert_dd.target.type=string

Deploying Plugins in Standalone Mode

To run connectors in a standalone mode the user will need to execute worker using Kafka connect-standalone script:

bin/connect-standalone worker.properties \ 
connector1.properties connector2.properties connector3.properties ...

Here is a sample of worker.properties file with the Kafka Connect worker configuration that uses Avro serialization and integrates with the Confluent platform Schema Registry:

bootstrap.servers=localhost:9092 
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/kafka-connect-tb/lib

A sample of the standalone TimeBase Source Connector configuration file:

name=tb-source-connector 

connector.class=deltix.kafka.connect.TBSourceConnector
tasks.max=1
timebase.url=dxtick://localhost:8011
timebase.user=testuser
timebase.pwd=testpwd
timebase.stream=messages
timebase.message.id=sequence
topic=tb-messages
instrument.field=instrument_type
symbol.field=symbol
time.field=time

A sample of the standalone TimeBase Sink Connector configuration file:

name=tb-sink-connector 
connector.class=deltix.kafka.connect.TBSinkConnector
tasks.max=1
timebase.url=dxtick://localhost:8011
timebase.user=testuser
timebase.pwd=testpwd
timebase.stream=messages
timebase.message.id=sequence
topics=tb-messages
instrument.field=instrument_type
symbol.field=symbol
time.field=time

Deploying Plugins in Distributed Mode

To run plugins in a distributed mode, start one or more workers, either by using Kafka connect-distributed.sh script or in a Docker using Confluent Platform Kafka Connect Docker image. Here is an example of a command that runs a worker in a Docker container:

# example for your reference

docker run -d \
--name="kafka-tbconnect" \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=kafka_server:port \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="tbconnect" \
-e CONNECT_CONFIG_STORAGE_TOPIC="tbconnect.docker-connect-configs" \
-e CONNECT_OFFSET_STORAGE_TOPIC="tbconnect.docker-connect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="tbconnect.docker-connect-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="URL:port" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="URL:port" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/var/customlib \
-v /home/user/kafka-connect-tb/lib:/var/customlib \
confluentinc/cp-kafka-connect:latest

All workers in the same group must share the group ID and Kafka topics for storing config, offset and status data. CONNECT_PLUGIN_PATH includes /var/customlib folder that contains TimeBase plugin JAR.

After worker is launched, it will start a webserver on a specified CONNECT_REST_PORT where it will provide REST API for managing connectors. The command to add a TimeBase Source connector will look like this:

curl -X POST -H "Content-Type: application/json" --data @kafka-connect-tb/config/ connector_TBSourceAppActivity_config.json http://$CONNECT_HOST:8083/connectors 

Where connector_TBSourceAppActivity_config.json file will have the connector configuration in JSON format:

//example for your reference
{
"name": "TBSourceAppActivity",
"config": {
"connector.class": "deltix.kafka.connect.TBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"timebase.url": "dxtick://10.6.220.90:8011",
"timebase.stream": "application_activity",
"timebase.message.id": "",
"topic": "tbconnect.application-activity-extended",
"instrument.field": "instrument_type",
"symbol.field": "symbol",
"time.field": "time",
"value.converter.schema.registry.url": "URL:port"
}
}

Command to check the connector status:

curl -s -X GET http://$CONNECT_HOST:8083/connectors/TBSourceAppActivity/status 

Security Settings

With security enabled on Kafka server, config will need to include additional security settings:

group.id=<kafka-group-id> 
ssl.truststore.location=<jks-truststore-file>
ssl.truststore.password=<jks-truststore-password>
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
principal="<principal-name>@xxx.com" \
keyTab="<keytab-file>" \
storeKey=true;

This set of properties will need to be also specified with an additional consumer. prefix in each property name for a Sink connector and producer. when using a Source connector. For instance: consumer.group.id. In addition, before running connect-standalone or connect-distributed worker script, set KAFKA_OPTS environment variable to include these Kerberos JVM properties:

export KAFKA_OPTS="-Djava.security.krb5.realm=xxx.com -Djava.security.krb5.kdc=xxx.comm-Djava.security.auth.login.config=/home/centos/tbconnect/config/kafka_client_jaas.conf -Dsun.security.krb5.debug=false" 

Data Type Mapping

When converting TimeBase stream record to Kafka Connect record we map TimeBase data types to Kafka Connect Schema types as follows:

  • IntegerDataType -> INT8, INT16, INT32 or INT64 depending on the native data type size.
  • FloatDataType -> FLOAT32 (float) or FLOAT64 (double).
  • CharDataType -> STRING.
  • EnumDataType -> STRING.
  • VarcharDataType -> STRING.
  • BooleanDataType -> BOOLEAN.
  • DateTimeDataType -> INT64 with Avro logicalType = “timestamp-millis”.
  • TimeOfDayDataType -> INT32 with Avro logicalType = “time-millis”.
  • BinaryDataType -> BYTES.
  • ArrayDataType - Array of value types. Complex types with array of structures are not supported.
  • ClassDataType - Not Supported. Currently all fields must have a primitive type or an Array type.

Limitations

Limitations of the current version 0.1:

  • Polymorphic TimeBase streams are not fully supported. Source Plugin can only handle single type of messages specified with timebase.message.type attribute.
  • Fields' values encoded with Alphanumeric and Decimal64 encoders are not converted.