Kafka
Striim can read from Apache Kafka using its Kafka Reader. Kafka Reader uses Kafka client version 3.9.x, which is compatible with Kafka brokers from version 2.x through 4.0. Kafka Reader supports Apache Kafka, Confluent Cloud, Amazon MSK, and Azure Event Hubs.
For information on configuring Kafka streams, see Reading a Kafka stream with KafkaReader and Persisting a stream to Kafka.
See Supported reader-parser combinations for parsing options.
Upgrading from Kafka Reader 2.1
Kafka Reader 2.1 (using Apache Kafka client 2.1) is deprecated as of Striim 5.4.0.2. When upgrading to 5.4.0.2, any Kafka Reader 2.1 adapters in created or quiesced applications are automatically upgraded to Kafka Reader 3.9 and corresponding connection profiles are created.
If you import TQL exported from Striim 5.2 or earlier after upgrading, any Kafka Reader components will use the deprecated Kafka Reader 2.1. To use the current version, edit the TQL before importing to use the new properties, and create a connection profile with the appropriate settings.
The following table shows how properties map from Kafka Reader 2.1 to the current Kafka Reader.
Kafka Reader 2.1 property | Kafka Reader 3.9 property |
|---|---|
BrokerAddress, KafkaConfig | Connection Profile Name |
Topic | Topic |
StartOffset | StartOffset |
StartTimestamp | StartTimestamp |
PartitionIDList | PartitionIDList |
AutoMapPartition | AutoMapPartition |
IgnorableException | IgnorableException |
N/A | ConnectionRetryPolicy (new) |
Note
Kafka Reader 2.1 cannot connect to Kafka 4.0+ brokers. If your brokers are running version 4.0 or newer, you must use Kafka Reader 3.9.
Kafka Reader properties
Kafka Reader requires a Kafka connection profile for broker address, authentication, and security configuration. Create a connection profile before configuring the reader.
Property | Type | Default value | Notes |
|---|---|---|---|
Auto Map Partition | Boolean | True | When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group. When deploying on a Forwarding Agent, set to False. |
Connection Profile Name | String | Required. The name of the Kafka connection profile to use for broker address, authentication, and security configuration. The Kafka Reader uses the same connection profile format as the Kafka Writer. | |
Connection Retry Policy | String | initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m | Determines how connection failures are retried before halting the application. Accepts key-value pairs in the format
|
Ignorable Exception | String | You can specify a list of exceptions that Striim should ignore so as to avoid a system halt. Currently, the only supported exception is "OFFSET_MISSING_EXCEPTION" which allows you to avoid a system halt when there is a partition purge. A partition purge is Kafka's process of freeing up Kafka message logs after the retention period expires. Messages have a TTL (time to live) with retention period properties in place. Upon expiry, messages are marked for deletion according to their creation timestamp. Once a message/offset is marked for deletion, and the consumer tries to read from the purged offset, Kafka throws an "OFFSET_MISSING_EXCEPTION". Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). | |
Partition ID List | String | Partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions. | |
Start Offset | Long | -1 | With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition. If you specify startOffset, leave startTimestamp at its default value. You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated offset. For example:
|
Start Timestamp | String | If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point. If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone. If you specify startTimestamp, leave startOffset at its default value. You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated timestamp. For example:
| |
Topic | String | Required. The Kafka topic to read from. Do not modify this property when recovery is enabled for the application. |
Reading transactionally committed messages
Kafka Reader can consume only records that are part of committed Kafka transactions by setting the consumer isolation level in the connection profile.
To read only committed messages, set the following in the Consumer Config section of the connection profile:
isolation.level=read_committed
The default value (if not set) is read_uncommitted. When using read_committed, records from aborted transactions are filtered out by the consumer. Offsets in the log may appear to skip values because Kafka assigns offsets to all records, including aborted ones that are not returned to the reader.
When starting from a specific offset, consumption begins at that offset or the next visible committed record, whichever applies. If the specified offset falls inside an aborted transaction, the reader advances to the next committed record.
Avro key support
Kafka Reader supports Avro record keys with schema registry integration, enabling richer key structures beyond traditional string keys. Features include multi-field keys, nested key objects, and automatic key schema resolution from Confluent or Karapace Schema Registry.
To configure Avro key support, add the key.deserializer property to the Consumer Config section of the connection profile:
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Also configure the schema registry URL in the Avro Parser properties.
Karapace Schema Registry support
Karapace is an open-source schema registry that is API-compatible with Confluent Schema Registry. Kafka Reader supports Karapace via direct URL configuration in the Avro Parser.
Configure the following Avro Parser properties for Karapace:
Property | Type | Required | Description |
|---|---|---|---|
| String | Yes | Karapace schema registry endpoint URL, for example |
| String | No | Authentication configuration string in the format |
Example Avro Parser configuration with Karapace and BASIC authentication:
PARSE USING Global.AvroParser (
schemaregistryurl: 'https://karapace.example.com:8081',
schemaregistryConfiguration:
'basic.auth.user.info=username:password,basic.auth.credentials.source=USER_INFO'
)Consuming DDL events
Kafka Reader can consume Data Definition Language (DDL) changes that originate from OLTP sources and are produced by Striim targets. Kafka Reader treats DDL as normal Kafka records, exposing headers, key, timestamp, and offsets in WAEvent.metadata and passing the payload to the parser.
To identify DDL records in the parser output, check the OperationType field for a value of DDL (with OperationName values such as CREATE, ALTER, or DROP).
Example filter query:
SELECT *
FROM KafkaInput
WHERE ddl_operation IN ("CREATE","ALTER","DROP");If you only want committed DDL/DML, set isolation.level=read_committed via the connection profile Consumer Config.
Certificate rotation
Kafka Reader supports seamless certificate rotation via connection profiles. New server and client certificate pairs can be generated and applied to a Kafka connection profile even when applications using the profile are running.
The Connection Retry Policy automatically handles temporary connection failures during certificate rotation. When certificates are updated, the reader reconnects under the configured retry policy. Jitter is applied to prevent multiple Kafka Readers from retrying simultaneously during rotation events.
To rotate certificates:
Navigate to Manage Striim > Connection Profiles.
Locate the connection profile in use and click Edit.
Upload or reference the new certificate and key files.
Save the changes to apply the new certificates.
Note
The rotated certificates are used by the adapters when the adapter encounters an exception and retries, or on application restart.
WAEvent metadata
Kafka Reader exposes the following metadata fields on each WAEvent:
TopicNamePartitionIDKafkaRecordOffsetKafkaRecordTimestampKafkaRecordKeyKafkaRecordHeader
When Kafka Writer has been configured to add transaction context to record headers, the KafkaRecordHeader map may contain TxnID, TableName, and OperationName fields.
Troubleshooting connection retries
During retry operations, Kafka Reader performs offset validation that may result in fatal exceptions that cannot be retried or ignored.
MissingOffsetDuringConnectionRetry (BELOW_RANGE): Kafka Reader attempts to resume from an offset below the partition's current begin offset (data no longer available). Common causes include Kafka log retention policies deleting old messages, log compaction, or application downtime exceeding the retention period.
OffsetAfterRangeDuringConnectionRetry (ABOVE_RANGE): Kafka Reader attempts to resume from an offset above the partition's current end offset (data does not exist yet). Common causes include corrupted checkpoint files, manual offset manipulation, Kafka partition truncation, or topic recreation with the same name.
Supported parsers
Kafka Reader supports the following parsers.
Parser | Event type |
|---|---|
Apache access log (AAL Parser) | WAEvent |
Avro | AvroEvent |
DSV | WAEvent |
Free-form text | WAEvent |
HL7v2 | XMLNodeEvent |
JSON | JsonNodeEvent |
Name-value pair | WAEvent |
XML | XMLNodeEvent |
Sample application
The following sample application reads data from a Kafka topic using a connection profile and writes it to a file:
CREATE SOURCE KafkaReaderSample USING KafkaReader ( connectionProfileName: 'admin.KafkaCP', topic: 'KafkaWriterSample', startOffset: '0' ) PARSE USING DSVParser () OUTPUT TO RawKafkaStream; CREATE TARGET KafkaReaderOut USING FileWriter ( filename: 'KafkaReaderOutput' ) FORMAT USING DSVFormatter () INPUT FROM RawKafkaStream;
Kafka property set configuration
Striim uses property sets to configure Kafka connections for persistent streams and other Kafka-related functionality. In Striim 5.4.0, the Kafka property set syntax has been updated to remove the dependency on Zookeeper and align with the official Apache Kafka API.
Property changes in Striim 5.4.0
The following changes apply to Kafka property sets in Striim 5.4.0:
zk.address is no longer required. In previous versions, the
zk.addressproperty was mandatory because Striim used Zookeeper to create and delete Kafka topics. Striim 5.4.0 uses Kafka's AdminClient API instead, which connects directly to Kafka brokers and supports both Zookeeper mode and KRaft mode clusters.bootstrap.servers is the recommended property. The
bootstrap.serversproperty aligns with the official Apache Kafka API. Becauseserveris a reserved keyword in TQL, you must enclose'bootstrap.servers'in single quotes.bootstrap.brokers continues to be supported. Existing property sets that use
bootstrap.brokerswill continue to work without modification.Existing property sets with zk.address continue to work. You do not need to modify existing property sets. The
zk.addressproperty is accepted when present but is no longer used.
Creating Kafka property sets
The following example shows the recommended syntax for creating a Kafka property set in Striim 5.4.0:
CREATE PROPERTYSET KafkaPS ('bootstrap.servers': 'localhost:9092', partitions: '250');For backward compatibility, you can also use bootstrap.brokers:
CREATE PROPERTYSET KafkaPS (bootstrap.brokers: 'localhost:9092', partitions: '250');
Both properties are synonymous, and you should specify only one of them in a property set.
Existing property sets that include zk.address continue to work:
-- This syntax continues to work but zk.address is no longer required CREATE PROPERTYSET KafkaPS (zk.address: 'localhost:2181', bootstrap.brokers: 'localhost:9092', partitions: '250');
Configuring authentication for Kafka Reader
Kafka Reader uses a Kafka connection profile for all authentication and security configuration. The connection profile supports NONE, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (Kerberos), Mutual TLS, and AWS MSK IAM authentication methods.
Note
The KafkaConfig, KafkaConfigPropertySeparator, and KafkaConfigValueSeparator properties used in Kafka Reader 2.1 for inline authentication configuration are no longer supported. Migrate to a Kafka connection profile.
Reading a Kafka stream with KafkaReader
For an overview of this feature, see Introducing Kafka streams.
Reading from a Kafka stream can be useful for development tasks such as doing A/B comparisons of variations on a TQL application. If you modified Samples.PosApp.tql to persist PosDataStream to Kafka, the following source would read the persisted data from Kafka.
CREATE SOURCE AccessLogSource USING KafkaReader ( connectionProfileName: 'admin.KafkaCP', Topic: 'Samples_PosDataStream', PartitionIDList: '0', startOffset: 0 ) PARSE USING StriimParser () OUTPUT TO KafkaDSVStream;
For more information, see Kafka.