Kafka
Overview
Kafka Connect is a framework for streaming data into or out of Apache Kafka®. This document explains how to use Kafka Connect connector for TimeBase. This service helps with importing data from TimeBase stream into Kafka topic (so-called Source Connector) or exporting data in reverse direction (so-called Sink Connector). Refer to Kafka resources to learn more about Kafka Connect technology.
Installation
Plugin is distributed in a Kafka Connect Docker image or as a jar file, for example: timebase-kafka-connector-0.7.1-all.jar
. When installing plugin jar, add it to the classpath of Kafka Connect Worker (via worker plugin.path
property). Connector can be executed in standalone or distributed mode (more about this later).
- The Source Connector class is called
deltix.kafka.connect.TBSourceConnector
. - The Sink Connector class is called
deltix.kafka.connect.TBSinkConnector
.
Source Connector
TimeBase Source Connector will copy data from the specified TimeBase stream to a Kafka topic. It supports the following config properties:
timebase.url
- TimeBase server URL.timebase.user
- TimeBase username.timebase.pwd
- TimeBase user password.timebase.stream
- Source stream name.timebase.message.id
- Optional name ofINT64
field containing unique message ID. Used to keep track of the last recorded message when plugin is restarted. When omitted, we will use timestamp and per-timestamp message counter.timebase.message.type
- TimeBase message type name. Is used to select message type in polymorphic TimeBase streams. This parameter is only required when replicating polymorphic streams.topic
- Kafka target topic the data is copied to.instrument.field
- Optional Kafka topic field for storing TimeBase message Instrument Type.symbol.field
- Optional Kafka topic field for storing TimeBase message Symbol.time.field
- Optional Kafka topic field for storing TimeBase message Time.key.fields
- Optional comma-separated list of TimeBase message fields. Is used as a record key in Kafka Topics. The key string will be created by concatenating field values in a specified order with ‘_’ as a separator.include.fields
- Optional comma-separated list of stream fields. When specified, only these fields will be copied to Kafka.exclude.fields
- Optional comma-separated list of stream fields that should not be copied to Kafka.rename.fields
- Optional comma-separated list of top-level field names and their aliases (new names to be used in Kafka). The original name and alias must be separated by ‘:’. For instance, the property to renamebusiness_time
field to time, andbusiness_operation
field to operation will look like this:rename.fields=business_time:time,business_operation:operation
.
Sink Connector
TimeBase Sink Connector will copy data from the specified Kafka topic to a TimeBase stream. It supports the following config properties:
timebase.url
- TimeBase server URL.timebase.user
- TimeBase username.timebase.pwd
- TimeBase user password.timebase.stream
- Destination stream name.timebase.message.id
- Optional TimeBase message field for storing Kafka record offset.timebase.message.key
- Optional TimeBase message field for storing Kafka record key.topics
- Kafka topic from which data will be copied. Plugin supports copying from a single topic.instrument.field
- Optional Kafka topic field to copy to TimeBase message Instrument Type.symbol.field
- Optional Kafka topic field to copy to TimeBase message Symbol.time.field
- Optional Kafka topicINT64
field to copy to TimeBase message Time.include.fields
- Optional comma-separated list of top-level topic fields. When specified, only these fields will be copied to TimeBase.exclude.fields
- Optional comma-separated list of top-level topic fields that should not be copied to TimeBase.timestamp.fields
- Optional comma-separated list ofINT64
fields to be interpreted asAVRO
logical type timestamp-millis (TimeBase DataTimeDataType).flatten.fields
- Optional comma-separated list of top levelStruct
fields which attributes should be written as top level attributes to TimeBase. Field names will contain ‘.’ separated field name path. For instance,business_time
field ofdatahub_operation_metadata
structure field will appear at the top level asdatahub_operation_metadata.business_time
.rename.fields
- Optional comma-separated list of top-level field names and their aliases (new names to be used in TimeBase). The original name and alias must be separated by ‘:’. For instance, using example above, where we flatteneddatahub_operation_metadata
, thebusiness_time
field can be renamed tobt
TimeBase field with thisrename.fields
attribute:rename.fields=datahub_operation_metadata.business_time:bt
.
Transformations
Kafka Connect framework supports Single Message Transformations (SMTs) that are applied to messages as they flow through the Connect. In the TimeBase to Kafka case, SMTs transform messages after the source connector has produced them, but before they are written to Kafka. In Kafka to TimeBase case, SMTs transform Kafka messages before they are sent to the Sink Connector.
Kafka Connect provides a number of SMTs. Some of the transformations overlap with field mapping features supported by the plugins. Transformations are specified in the same connector properties file. Multiple transformation can be chained together. See examples here for more information on how to use them.
Below, there is an example of transformation properties converting Avro
logical date fields to strings
before they are written to TimeBase by the Sink Connector. Without these transformations the fields hire_date
and dismissal_date
would be output as integers
. In this case we chain two transformations (convert_hd
and convert_dd
) together:
transforms=convert_hd,convert_dd
transforms.convert_hd.field=hire_date
transforms.convert_hd.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convert_hd.format=yyyy-MM-dd
transforms.convert_hd.target.type=string
transforms.convert_dd.field=dismissal_date
transforms.convert_dd.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.convert_dd.format=yyyy-MM-dd
transforms.convert_dd.target.type=string
Deploying Plugins in Standalone Mode
To run connectors in a standalone mode the user will need to execute worker using Kafka connect-standalone
script:
bin/connect-standalone worker.properties \
connector1.properties connector2.properties connector3.properties ...
Here is a sample of worker.properties
file with the Kafka Connect worker configuration that uses Avro
serialization and integrates with the Confluent platform Schema Registry:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/kafka-connect-tb/lib
A sample of the standalone TimeBase Source Connector configuration file:
name=tb-source-connector
connector.class=deltix.kafka.connect.TBSourceConnector
tasks.max=1
timebase.url=dxtick://localhost:8011
timebase.user=testuser
timebase.pwd=testpwd
timebase.stream=messages
timebase.message.id=sequence
topic=tb-messages
instrument.field=instrument_type
symbol.field=symbol
time.field=time
A sample of the standalone TimeBase Sink Connector configuration file:
name=tb-sink-connector
connector.class=deltix.kafka.connect.TBSinkConnector
tasks.max=1
timebase.url=dxtick://localhost:8011
timebase.user=testuser
timebase.pwd=testpwd
timebase.stream=messages
timebase.message.id=sequence
topics=tb-messages
instrument.field=instrument_type
symbol.field=symbol
time.field=time
Deploying Plugins in Distributed Mode
To run plugins in a distributed mode, start one or more workers, either by using Kafka connect-distributed.sh
script or in a Docker using Confluent Platform Kafka Connect Docker image. Here is an example of a command that runs a worker in a Docker container:
# example for your reference
docker run -d \
--name="kafka-tbconnect" \
--net=host \
-e CONNECT_BOOTSTRAP_SERVERS=kafka_server:port \
-e CONNECT_REST_PORT=8083 \
-e CONNECT_GROUP_ID="tbconnect" \
-e CONNECT_CONFIG_STORAGE_TOPIC="tbconnect.docker-connect-configs" \
-e CONNECT_OFFSET_STORAGE_TOPIC="tbconnect.docker-connect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="tbconnect.docker-connect-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="URL:port" \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="URL:port" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
-e CONNECT_PLUGIN_PATH=/usr/share/java,/var/customlib \
-v /home/user/kafka-connect-tb/lib:/var/customlib \
confluentinc/cp-kafka-connect:latest
All workers in the same group must share the group ID and Kafka topics for storing config, offset and status data. CONNECT_PLUGIN_PATH
includes /var/customlib
folder that contains TimeBase plugin JAR.
After worker is launched, it will start a webserver on a specified CONNECT_REST_PORT
where it will provide REST API for managing connectors. The command to add a TimeBase Source connector will look like this:
curl -X POST -H "Content-Type: application/json" --data @kafka-connect-tb/config/ connector_TBSourceAppActivity_config.json http://$CONNECT_HOST:8083/connectors
Where connector_TBSourceAppActivity_config.json
file will have the connector configuration in JSON format:
//example for your reference
{
"name": "TBSourceAppActivity",
"config": {
"connector.class": "deltix.kafka.connect.TBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"timebase.url": "dxtick://10.6.220.90:8011",
"timebase.stream": "application_activity",
"timebase.message.id": "",
"topic": "tbconnect.application-activity-extended",
"instrument.field": "instrument_type",
"symbol.field": "symbol",
"time.field": "time",
"value.converter.schema.registry.url": "URL:port"
}
}
Command to check the connector status:
curl -s -X GET http://$CONNECT_HOST:8083/connectors/TBSourceAppActivity/status
Security Settings
With security enabled on Kafka server, config will need to include additional security settings:
group.id=<kafka-group-id>
ssl.truststore.location=<jks-truststore-file>
ssl.truststore.password=<jks-truststore-password>
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
principal="<principal-name>@xxx.com" \
keyTab="<keytab-file>" \
storeKey=true;
This set of properties will need to be also specified with an additional consumer.
prefix in each property name for a Sink connector and producer.
when using a Source connector. For instance: consumer.group.id
. In addition, before running connect-standalone
or connect-distributed
worker script, set KAFKA_OPTS
environment variable to include these Kerberos JVM properties:
export KAFKA_OPTS="-Djava.security.krb5.realm=xxx.com -Djava.security.krb5.kdc=xxx.comm-Djava.security.auth.login.config=/home/centos/tbconnect/config/kafka_client_jaas.conf -Dsun.security.krb5.debug=false"
Data Type Mapping
When converting TimeBase stream record to Kafka Connect record we map TimeBase data types to Kafka Connect Schema types as follows:
IntegerDataType
->INT8
,INT16
,INT32
orINT64
depending on the native data type size.FloatDataType
->FLOAT32
(float) orFLOAT64
(double).CharDataType
->STRING
.EnumDataType
->STRING
.VarcharDataType
->STRING
.BooleanDataType
->BOOLEAN
.DateTimeDataType
->INT64
with AvrologicalType = “timestamp-millis”
.TimeOfDayDataType
->INT32
with AvrologicalType = “time-millis”
.BinaryDataType
->BYTES
.ArrayDataType
- Array of value types. Complex types with array of structures are not supported.ClassDataType
- Not Supported. Currently all fields must have a primitive type or an Array type.
Limitations
Limitations of the current version 0.1:
- Polymorphic TimeBase streams are not fully supported. Source Plugin can only handle single type of messages specified with
timebase.message.type attribute
. - Fields' values encoded with
Alphanumeric
andDecimal64
encoders are not converted.