Kafka Writer
Writes to a topic in Apache Kafka 0.11 or 2.1. Support for Kafka 0.8, 0.9, and 0.10 is deprecated.
There are two versions of KafkaWriter, 0.11.0, and 2.1.0. Use the one that corresponds to the target Kafka broker. For example, to use 2.1.0, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '2.1.0'
. If writing to the internal Kafka instance, use 0.11.0
.
Kafka Writer properties
property | type | default value | notes |
---|---|---|---|
Broker Address | String | ||
Kafka Config | String | Specify any properties required by the authentication method used by the specified Kafka broker (see Configuring authentication in Kafka Config. Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details. When writing to a topic in Confluent Cloud, specify the appropriate SASL properties. To send messages in Confluent wire format, specify | |
Kafka Config Property Separator | String | ; | Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon. |
Kafka Config Value Separator | String | = | Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol. |
Message Header | String | Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig
To specify multiple custom headers, separate them with semicolons. | |
Message Key | String | Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig MessageKey : CName=”Striim” MessageKey : Table=@metadata(TableName); Operation=@metadata(OperationName);key1=@userdata(key1) MessageKey : CityName=City; Zipcode=zip MessageKey : CName=”Striim”;Table=@metadata(TableName); Operation=@metadata(OperationName) Among other possibilities, you may use this property to support log compaction or to allow downstream applications to use queries based on the message payload.. | |
Mode | String | Sync | |
Parallel Threads | Integer | ||
Partition Key | String | The name of a field in the input stream whose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition. If the input stream is of any type except WAEvent, specify the name of one of its fields. If the input stream is of the WAEvent type, specify a field in the METADATA map (see WAEvent contents for change data) using the syntax | |
Topic | String | The existing Kafka topic to write to (will not be created if it does not exist). If more than one Kafka Writer writes to the same topic, recovery is not supported (see Recovering applications. (Recovery is supported when using Parallel Threads.) |
This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.
Notes on the KafkaConfig property
With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig.
Kafka producer property | notes |
---|---|
acks |
|
batch.size linger.ms retries |
|
enable.idempotence | When using version 2.1.0 and async mode, set to true to write events in order. |
value.serializer | default value is org.apache.kafka.common.serialization.ByteArrayDeserializer; to write messages in Confluent wire format, set to io.confluent.kafka.serializers.KafkaAvroSerializer |
Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig properties can be safely ignored.
KafkaWriter sample application
The following sample code writes data from PosDataPreview.csv
to the Kafka topic KafkaWriterSample
. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.
CREATE SOURCE PosSource USING FileReader ( directory:'Samples/PosApp/AppData', wildcard:'PosDataPreview.csv', positionByEOF:false ) PARSE USING DSVParser ( header:yes ) OUTPUT TO RawStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM RawStream; CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'( brokeraddress:'localhost:9092', topic:'KafkaWriterSample' ) FORMAT USING DSVFormatter () INPUT FROM PosDataStream;
You can verify that data was written to Kafka by running the Kafka Reader sample application.
The first field in the output (position
) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.
mon
output (see Using the MON command) for targets using KafkaWriter includes:
in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers
in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim
Enabling compression
When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.
To enable batch compression for version 0.8.0, include the compression.codec
property in KafkaConfig. Supported values are gzip
and snappy
. For example:
KafkaConfg:'compression.codec=snappy'
To enable compression for version 0.9, 0.10, or 0.11, include the compression.type
property in KafkaConfig. Supported values are gzip
, lz4
, snappy
. For example:
KafkaConfig:'compression.type=snappy'
Writing to multiple Kafka partitions
If the INPUT FROM
stream is partitioned, events will be distributed among Kafka partitions based on the values in the input stream's PARTITION BY
property. All events with the same value in the PARTITION BY
field will be written to the same randomly selected partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the various PARTITION BY
field values (for example, if 80% of the events have the same value, then one of the Kafka partitions will contain at least 80% of the events). In the example above, to enable partitioning by city, you would revise the definition of TransformedDataStream
as follows:
CREATE STREAM TransformedDataStream OF TransformedDataType PARTITION BY City;
To override this default behavior and send events to specific partitions based on the PARTITION BY
field values, see Creating a custom Kafka partitioner.
Setting KafkaWriter's mode property: sync versus async
KafkaWriter performs differently depending on whether the mode property value is sync or async and whether recovery (see Recovering applications) is enabled for the application. The four possibilities are:
notes | |
---|---|
sync with recovery | Provides the most accurate output. Events are written in order with no duplicates ("exactly-once processing," also known as E1P), provided that you do not change the partitioner logic, number of partitions, or IP address used by Striim while the application is stopped. To avoid duplicate events after recovery, the Kafka topic's retention period must be longer than the amount of time that elapses before recovery is initiated (see Recovering applications) and, if writing to multiple partitions, the brokers must be brought up in reverse of the order in which they went down. With this configuration, two KafkaWriter targets (even if in the same application) cannot write to the same topic. Instead, use a single KafkaWriter, a topic with multiple partitions (see Writing to multiple Kafka partitions), and, if necessary, parallel threads (see Creating multiple writer instances). |
async with recovery | Provides higher throughput with the tradeoff that events are written out of order (unless using KafkaWriter version 2.1 with enable.idempotence set to true in KafkaConfig) and recovery may result in some duplicates ("at-least-once processing," also known as A1P). |
sync without recovery | Appropriate with non-recoverable sources (see Recovering applications) when you need events to be written in order. Otherwise, async will give better performance. |
async without recovery | Appropriate with non-recoverable sources when you don't care whether events are written in order. Throughput will be slightly faster than async with recovery. |
When using sync, multiple events are batched in a single Kafka message. The number of messages in a batch is controlled by the batch.size
parameter, which by default is 1 million bytes. The maximum amount of time KafkaWriter will wait between messages is set by the linger.ms
parameter, which by default is 1000 milliseconds. Thus, by default, KafkaWriter will write a message after it has received a million bytes or one second has elapsed since the last message was written, whichever occurs first.
Batch.size
must be larger than the largest event KafkaWriter will receive, but must not exceed the max.message.bytes
size in the Kafka topic configuration.
The following setting would write a message every time KafkaWriter has received 500,000 bytes or two seconds has elapsed since the last message was written:
KafkaConfig:'batch.size=500000,linger.ms=2000'
KafkaWriter output with DSVFormatter
Each output record will begin with a field containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).
For input events of this user-defined Striim type:
CREATE TYPE emptype ( id int, name string);
output would be similar to:
1234002350,1,User One
For input events of type WAEvent, output (Position, TableName, CommitTimestamp, Operation, data) would be similar to:
SCN:1234002344,Employee,12-Dec-2016 19:13:00,INSERT,1,User One
KafkaWriter output with JSONFormatter
Each JSON output node will contain a field named __striimmetadata with a nested field position
containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).
For input events of this user-defined Striim type:
CREATE TYPE emptype ( id int, name string);
output would be similar to:
{ "ID": 1, "Name": "User One", "__striimmetadata" : {"position" : "SCN:1234002344" } }
For input events of type WAEvent, output would be similar to:
{ "metadata" : { "TABLENAME" : "Employee","CommitTimestamp" : "12-Dec-2016 19:13:00", "OperationName" : "INSERT" } "data" : { "ID" : "1", "NAME" : "User One"}, "__striimmetadata" : { "position" : "SCN:1234002350" } // but in binary format }
KafkaWriter output with XMLFormatter
Each output record will include an extra element __striimmetadata
with a nested element position
containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).
For input events of this user-defined Striim type:
CREATE TYPE emptype ( id int, name string);
output would be similar to:
... <ID> 1 </ID> <Name> User One </Name> <__striimmetadata> <position>1234002344</position> </__striimmetadata> ...