Skip to main content

Kafka Writer

Writes to a topic in Apache Kafka 0.11, 2.1, or 3.3.2. Support for Kafka 0.8, 0.9, and 0.10 is deprecated.

Use the version of KafkaWriter that corresponds to the target Kafka broker. For example, to write against a Kafka 2.1 or 3.3 cluster, the syntax is CREATE TARGET <name> USING KafkaWriter VERSION '2.1.0'. If writing to the internal Kafka instance, use 0.11.0.

Kafka Writer properties

property

type

default value

notes

Broker Address

String

Kafka Config

String

Specify any properties required by the authentication method used by the specified Kafka broker (see Configuring authentication in Kafka Config.

Optionally, specify Kafka producer properties, separated by semicolons. See the table below for details.

When writing to a topic in Confluent Cloud, specify the appropriate SASL properties.

To send messages in Confluent wire format, specify value.deserializer=io.confluent.kafka.serializers.KafkaAvroSrializer. When Mode is Sync, also specify batch.size=-1 (not necessary when Mode is Async).

Kafka Config Property Separator

String

;

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Message Header

String

Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more custom headers to be added to messages as key-value pairs. Values may be:

  • a field name from an in put stream of a user-defined type: for example, MerchantID=merchantID

  • a static string: for example, Company="My Company"

  • a function: for example, to get the source table name from a WAEvent input stream that is the output of a CDC reader, Table Name=@metadata(TableName) (for more information, see WAEvent functions)

To specify multiple custom headers, separate them with semicolons.

Message Key

String

Optionally, if using Kafka 0.11 or later in async mode, or in sync mode with KafkaConfig batch.size=-1, specify one or more keys to be added to messages as key-value pairs. The property value may be a static string, one or more fields from the input stream, or a combination of both. Examples:

MessageKey : CName=”Striim”

MessageKey : Table=@metadata(TableName);
  Operation=@metadata(OperationName);key1=@userdata(key1)

MessageKey : CityName=City; Zipcode=zip

MessageKey : CName=”Striim”;Table=@metadata(TableName);
  Operation=@metadata(OperationName)

Among other possibilities, you may use this property to support log compaction or to allow downstream applications to use queries based on the message payload..

Mode

String

Sync

see Setting KafkaWriter's mode property: sync versus async

Parallel Threads

Integer

See Creating multiple writer instances.

Partition Key

String

The name of a field in the input stream whose values determine how events are distributed among multiple partitions. Events with the same partition key field value will be written to the same partition.

If the input stream is of any type except WAEvent, specify the name of one of its fields.

If the input stream is of the WAEvent type, specify a field in the METADATA map (see WAEvent contents for change data) using the syntax @METADATA(<field name>), or a field in the USERDATA map (see Adding user-defined data to WAEvent streams), using the syntax @USERDATA(<field name>). If appropriate, you may concatenate multiple METADATA and/or USERDATA fields.WAEvent contents for change data

Topic

String

The existing Kafka topic to write to (will not be created if it does not exist). If more than one Kafka Writer writes to the same topic, recovery is not supported (see Recovering applications. (Recovery is supported when using Parallel Threads.)Recovering applications

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.Supported writer-formatter combinations

Notes on the KafkaConfig property

With the exceptions noted in the following table, you may specify any Kafka producer property in KafkaConfig. 

Kafka producer property

notes

acks

  • in sync mode, may be set to 1 or all

  • in async mode, may be set to 0, 1, or all

batch.size

linger.ms

retries

  • In sync mode, to prevent out-of-order events, the producer properties set in Kafka with will be unchanged and ignored, and Striim will handle these internally.

  • In async mode, Striim will update the Kafka producer properties and these will be handled by Kafka.

  • In sync mode, you may set batch.size=-1 to write one event per Kafka message. This will seriously degrade performance so is not recommended in a production environment. With this setting, messages will be similar to those in async mode.

enable.idempotence

When using version 2.1.0 and async mode, set to true to write events in order.

value.serializer

default value is org.apache.kafka.common.serialization.ByteArrayDeserializer; to write messages in Confluent wire format, set to io.confluent.kafka.serializers.KafkaAvroSerializer

Internally, KafkaWriter invokes KafkaConsumer for various purposes, and the WARNING from the consumer API due to passing KafkaConfig  properties can be safely ignored.

KafkaWriter sample application

The following sample code writes data from PosDataPreview.csv to the Kafka topic KafkaWriterSample. This topic already exists in Striim's internal Kafka instance. If you are using an external Kafka instance, you must create the topic before running the application.

CREATE SOURCE PosSource USING FileReader (
  directory:'Samples/PosApp/AppData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:yes
)
OUTPUT TO RawStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM RawStream;

CREATE TARGET KW11Sample USING KafkaWriter VERSION '0.11.0'(
  brokeraddress:'localhost:9092',
  topic:'KafkaWriterSample'
)
FORMAT USING DSVFormatter ()
INPUT FROM PosDataStream;

You can verify that data was written to Kafka by running the Kafka Reader sample application.

The first field in the output (position) stores information required to avoid lost or duplicate events after recovery (see Recovering applications). If recovery is not enabled, its value is NULL.

mon output (see Using the MON command) for targets using KafkaWriter includes:

  • in async mode only, Sent Bytes Rate: how many megabytes per second were sent to the brokers

  • in both sync and async mode, Write Bytes Rate: how many megabytes per second were written by the brokers and acknowledgement received by Striim

Enabling compression

When you enable compression in KafkaWriter, the broker and consumer should handle the compressed batches automatically. No additional configuration should be required in Kafka.

To enable compression for version 0.11, include the compression.type property in KafkaConfig. Supported values are gziplz4snappy. For example:

KafkaConfig:'compression.type=snappy'

Writing to multiple Kafka partitions

If the INPUT FROM stream is partitioned, events will be distributed among Kafka partitions based on the values in the input stream's PARTITION BY property. All events with the same value in the PARTITION BY field will be written to the same randomly selected partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the various PARTITION BY field values (for example, if 80% of the events have the same value, then one of the Kafka partitions will contain at least 80% of the events). In the example above, to enable partitioning by city, you would revise the definition of TransformedDataStream as follows:

CREATE STREAM TransformedDataStream OF TransformedDataType PARTITION BY City;

To override this default behavior and send events to specific partitions based on the PARTITION BY field values, see Creating a custom Kafka partitioner.

Setting KafkaWriter's mode property: sync versus async

KafkaWriter performs differently depending on whether the mode property value is sync or async and whether recovery (see Recovering applications) is enabled for the application. The four possibilities are:

notes

sync with recovery 

Provides the most accurate output. Events are written in order with no duplicates ("exactly-once processing," also known as E1P), provided that you do not change the partitioner logic, number of partitions, or IP address used by Striim while the application is stopped.

To avoid duplicate events after recovery, the Kafka topic's retention period must be longer than the amount of time that elapses before recovery is initiated (see Recovering applications) and, if writing to multiple partitions, the brokers must be brought up in reverse of the order in which they went down.

With this configuration, two KafkaWriter targets (even if in the same application) cannot write to the same topic. Instead, use a single KafkaWriter, a topic with multiple partitions (see Writing to multiple Kafka partitions), and, if necessary, parallel threads (see Creating multiple writer instances).

async with recovery

Provides higher throughput with the tradeoff that events are written out of order (unless using KafkaWriter version 2.1 with enable.idempotence set to true in KafkaConfig) and recovery may result in some duplicates ("at-least-once processing," also known as A1P).

sync without recovery

Appropriate with non-recoverable sources (see Recovering applications) when you need events to be written in order. Otherwise, async will give better performance.

async without recovery

Appropriate with non-recoverable sources when you don't care whether events are written in order. Throughput will be slightly faster than async with recovery.

When using sync, multiple events are batched in a single Kafka message. The number of messages in a batch is controlled by the batch.size parameter, which by default is 1 million bytes. The maximum amount of time KafkaWriter will wait between messages is set by the linger.ms parameter, which by default is 1000 milliseconds. Thus, by default, KafkaWriter will write a message after it has received a million bytes or one second has elapsed since the last message was written, whichever occurs first.

 Batch.size must be larger than the largest event KafkaWriter will receive, but must not exceed the max.message.bytes size in the Kafka topic configuration.

The following setting would write a message every time KafkaWriter has received 500,000 bytes or two seconds has elapsed since the last message was written:

KafkaConfig:'batch.size=500000,linger.ms=2000'

KafkaWriter output with DSVFormatter

Each output record will begin with a field containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

1234002350,1,User One

For input events of type WAEvent, output (Position, TableName, CommitTimestamp, Operation, data) would be similar to:

SCN:1234002344,Employee,12-Dec-2016 19:13:00,INSERT,1,User One

KafkaWriter output with JSONFormatter

Each JSON output node will contain a field named __striimmetadata with a nested field position containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

{
    "ID": 1,
    "Name": "User One", 
    "__striimmetadata"  :  {"position" : "SCN:1234002344" } 
}

For input events of type WAEvent, output would be similar to:

{
"metadata" : { "TABLENAME" : "Employee","CommitTimestamp" : "12-Dec-2016 19:13:00", 
  "OperationName" : "INSERT"  }
"data" : { 
      "ID" : "1",
      "NAME" : "User One"},
"__striimmetadata" :  { "position" : "SCN:1234002350" }   // but in binary format
}

KafkaWriter output with XMLFormatter

Each output record will include an extra element __striimmetadata with a nested element position containing information Striim can use to ensure that no duplicate records are written during recovery (see Recovering applications).

For input events of this user-defined Striim type:

CREATE TYPE emptype ( id int, name string);

output would be similar to:

...
<ID> 1 </ID>
<Name> User One </Name> 
<__striimmetadata>
  <position>1234002344</position>
</__striimmetadata> ...