Skip to main content

Kafka

Note

As part of upgrading to Striim 5.4, any Kafka Reader adapters in created or quiesced applications will be upgraded to the new versions of the adapters and corresponding connection profiles will be created. However, after upgrading you import TQL exported from Striim 5.2 or earlier, any Kafka Reader components will use the deprecated version Kafka 2.1 versions of the adapters. To use the new versions of the adapters, edit the TQL before importing to use the new properties, and create a connection profile with the appropriate properties.

Striim can read from Apache Kafka using its Kafka Reader. Kafka Reader in Striim 5.4.0 supports Kafka versions 2.0.0 to 4.0.1.

For information on configuring Kafka streams, see Reading a Kafka stream with KafkaReader and Persisting a stream to Kafka.Persisting a stream to Kafka

See Supported reader-parser combinations for parsing options.

Kafka Reader properties

Property

Type

Default value

Notes

Auto Map Partition

Boolean

True

When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group.

When deploying on a Forwarding Agent, set to False.

Broker Address

String

Specify the address of the Kafka broker in the format <IP address>:<port>.

Ignorable Exception

String

Available only in Kafka 0.11 and later versions.

You can specify a list of exceptions that Striim should ignore so as to avoid a system halt.

Currently, the only supported exception is "OFFSET_MISSING_EXCEPTION" which allows you to avoid a system halt when there is a partition purge. A partition purge is Kafka's process of freeing up Kafka message logs after the retention period expires. Messages have a TTL (time to live) with retention period properties in place. Upon expiry, messages are marked for deletion according to their creation timestamp. Once a message/offset is marked for deletion, and the consumer tries to read from the purged offset, Kafka throws an "OFFSET_MISSING_EXCEPTION".

Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE).

Kafka Config

String

See notes.

Specify any properties required by the authentication method used by the specified Kafka broker (see Configuring authentication in Kafka Config).

Optionally, specify Kafka consumer properties, separated by semicolons.

When reading from a topic in Confluent Cloud, specify the appropriate SASL properties.

When messages are in Confluent wire format, specify value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer.

When each message contains one Avro record and it is not length-delimited, set value.deserializer=com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer.

The default is:

max.partition.fetch.bytes=10485760;
fetch.min.bytes=1048576;
fetch.max.wait.ms=1000;
receive.buffer.bytes=2000000;
poll.timeout.ms=10000;
request.timeout.ms=60001;
session.timeout.ms=60000;
value.deserializer=
  com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer

Kafka Config Property Separator

String

;

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Partition ID List

String

Partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions.

Start Offset

Long

-1

With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition.

If you specify startOffset, leave startTimestamp at its default value.

You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated offset. For example:

StartOffset: '0=1; 1=0; 2=1024', where 0, 1, and 2 represent partition numbers, and the values represent the associated offsets.

Start Timestamp

String

If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point.

If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone.

If you specify startTimestamp, leave startOffset at its default value.

You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated timestamp. For example:

StartTimestamp: '0=<TS_0>;1=<TS_2>;2=<TS_3>', where 0, 1, and 2 represent partition numbers, and the values represent the associated start timestamps.

Topic

String

Do not modify this property when recovery is enabled for the application.

Kafka property set configuration

Striim uses property sets to configure Kafka connections for persistent streams and other Kafka-related functionality. In Striim 5.4.0, the Kafka property set syntax has been updated to remove the dependency on Zookeeper and align with the official Apache Kafka API.

Property changes in Striim 5.4.0

The following changes apply to Kafka property sets in Striim 5.4.0:

  • zk.address is no longer required. In previous versions, the zk.address property was mandatory because Striim used Zookeeper to create and delete Kafka topics. Striim 5.4.0 uses Kafka's AdminClient API instead, which connects directly to Kafka brokers and supports both Zookeeper mode and KRaft mode clusters.

  • bootstrap.servers is the recommended property. The bootstrap.servers property aligns with the official Apache Kafka API. Because server is a reserved keyword in TQL, you must enclose 'bootstrap.servers' in single quotes.

  • bootstrap.brokers continues to be supported. Existing property sets that use bootstrap.brokers will continue to work without modification.

  • Existing property sets with zk.address continue to work. You do not need to modify existing property sets. The zk.address property is accepted when present but is no longer used.

Creating Kafka property sets

The following example shows the recommended syntax for creating a Kafka property set in Striim 5.4.0:

CREATE PROPERTYSET KafkaPS ('bootstrap.servers': 'localhost:9092', partitions: '250');

For backward compatibility, you can also use bootstrap.brokers:

CREATE PROPERTYSET KafkaPS (bootstrap.brokers: 'localhost:9092', partitions: '250');

Both properties are synonymous, and you should specify only one of them in a property set.

Existing property sets that include zk.address continue to work:

-- This syntax continues to work but zk.address is no longer required
CREATE PROPERTYSET KafkaPS (zk.address: 'localhost:2181', bootstrap.brokers: 'localhost:9092', partitions: '250');

Upgrading the embedded Kafka cluster

When upgrading from Striim 5.2.0 to Striim 5.4.0, the embedded Kafka cluster must also be upgraded from version 3.6 to 3.9. This section describes the steps required to safely perform the Kafka cluster upgrade along with the Striim upgrade.

The default Kafka version in Striim 5.4.0 is 3.9, which uses Kafka Client API 3.9.1. The Kafka 3.9 module is compatible with Kafka 3.3 and higher.

Note

If kafkaversion is not specified in Global.DefaultKafkaProperties, the Kafka 3.9 module is loaded by default. Existing applications using DefaultKafkaProperties with kafkaversion=2.1 are automatically upgraded to kafkaversion=3.9.

Upgrading to Kafka 3.9 in Zookeeper mode

Follow this procedure if you are using the embedded Kafka cluster with Zookeeper and want to retain Zookeeper mode after upgrading to Striim 5.4.0.

Prerequisites

  • You are using the Kafka cluster shipped with Striim.

  • Your Kafka cluster is operational and running version 3.6 as bundled in Striim 5.2.0.

  • Zookeeper is used in your current configuration.

  • You have backed up your Striim configuration and data.

Upgrade procedure

  1. Stop all Striim applications that use persistent streams or Kafka-related functionality.

  2. Stop the Striim service on all nodes in your cluster.

  3. Stop the Kafka brokers in your embedded Kafka cluster.

  4. Stop the Zookeeper service.

  5. Upgrade Striim to version 5.4.0 following the standard upgrade procedure.

  6. Start the Zookeeper service.

  7. Start the Kafka brokers. The embedded Kafka 3.9 cluster will automatically use your existing Zookeeper configuration.

  8. Start the Striim service.

  9. Verify that your applications function correctly.

No changes to property sets are required when retaining Zookeeper mode. Existing applications using persistent streams will continue to work with the upgraded Kafka cluster.

Upgrading to Kafka 3.9 in KRaft mode

Follow this procedure if you are using the embedded Kafka cluster with Zookeeper and want to migrate to KRaft mode as part of your upgrade to Striim 5.4.0. KRaft mode eliminates the dependency on Zookeeper by using Kafka's built-in consensus protocol.

Prerequisites

  • You are using the Kafka cluster shipped with Striim.

  • Your Kafka cluster is operational and running version 3.6 as bundled in Striim 5.2.0.

  • Zookeeper is used in your current configuration.

  • You have backed up your Striim configuration and data.

Upgrade procedure

  1. Stop all Striim applications that use persistent streams or Kafka-related functionality.

  2. Stop the Striim service on all nodes in your cluster.

  3. Stop the Kafka brokers in your embedded Kafka cluster.

  4. Stop the Zookeeper service.

  5. Upgrade Striim to version 5.4.0 following the standard upgrade procedure.

  6. Migrate from Zookeeper to KRaft mode following the Apache Kafka KRaft documentation.

  7. Start the Kafka brokers in KRaft mode.

  8. Start the Striim service.

  9. Verify that your applications function correctly.

Note

When using KRaft mode, the zk.address property in Kafka property sets is not required. Striim 5.4.0 uses Kafka's AdminClient API to create and delete topics, which works with both Zookeeper mode and KRaft mode clusters.

Fresh Kafka installation

For new Striim installations, you can install the embedded Kafka cluster in either Zookeeper mode or KRaft mode using the installation script.

To install Kafka in KRaft mode (recommended for new installations):

./install_kafka.sh <path to kafka 3.9> kraft

To install Kafka in Zookeeper mode:

./install_kafka.sh <path to kafka 3.9>

Note

KRaft mode is recommended for new installations. Apache Kafka 4.0 removes Zookeeper support entirely, so adopting KRaft now ensures a smoother upgrade path to future Kafka versions.

Configure Kafka consumer properties for Kafka Reader and Kafka Writer

Specify the Kafka version for the broker being read using VERSION '0.#.0': for example, to read from a Kafka 2.1 or 3.3 cluster, the syntax is CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'.

The output type is WAevent except when using Avro Parser or JSONParser.

Configure Kafka for a topic in Confluent Cloud

When reading or writing from a topic in Confluent Cloud, you specify the appropriate SASL properties. Kafka Writer uses Confluent's Avro serializer which registers the schema in the Confluent Cloud schema registry and adds the schema registry ID with the respective Avro records in Kafka messages. You specify the Confluent Cloud configuration properties (server URL, SASL config, schema registry URL, and schema registry authentication credentials) as part of the KafkaConfig properties.

For example:

KafkaConfig: 'max.request.size==10485760:
      batch.size==10000120:
      sasl.mechanism==PLAIN:
      schema.registry.url==https://example.us-central1.gcp.confluent.cloud:
      security.protocol==SASL_SSL:sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule 
      required username=\"<kafka-cluster-api-key>\" 
      password=\"<kafka-cluster-api-secret>\";'

When messages are in Confluent wire format, specify value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer.

Sample application

The following sample application reads the data written to Kafka by the Kafka Writer sample application and writes it to striim/KR11Output.00:

CREATE SOURCE KR11Sample USING KafkaReader VERSION '2.1.0'(
  brokerAddress:'localhost:9092',
  topic:'KafkaWriterSample',
  startOffset:'0'
)
PARSE USING DSVParser ()
OUTPUT TO RawKafkaStream;

CREATE TARGET KR11Out USING FileWriter (
  filename:'KR11Output'
)
FORMAT USING DSVFormatter ()
INPUT FROM RawKafkaStream;

Configuring authentication in Kafka Config

For information on configuring Kafka streams, see Reading a Kafka stream with KafkaReader and Persisting a stream to Kafka.Persisting a stream to Kafka

Use Kafka SASL (Kerberos) authentication with SSL encryption

Caution

To avoid having passwords exposed as cleartext in Striim log messages, encrypt the KafkaConfig value using a property variable (see CREATE PROPERTYVARIABLE) or vault entry (see Using vaults).Using vaults

To use SASL authentication with SSL encryption, include the following properties in your Kafka Reader or Kafka Writer KafkaConfig, adjusting the paths to match your environment and using the passwords provided by your Kafka administrator.

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_SSL:
  sasl.mechanism==GSSAPI:
  sasl.jaas.config==com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true
    storeKey=true doNotPrompt=true serviceName="kafka" client=true keyTab="/etc/krb5.keytab"
    principal="striim@REALM.COM";:
  sasl.kerberos.service.name==kafka:
  ssl.truststore.location==/opt/striim/kafka.truststore.jks:
  ssl.truststore.password==secret:
  ssl.keystore.location==/opt/striim/kafka.keystore.jks:
  ssl.keystore.password==secret:
  ssl.key.password==secret'

Use Kafka SASL (Kerberos) authentication without SSL encryption

To use SASL authentication without SSL encryption, include the following properties in your Kafka Reader or Kafka Writer KafkaConfig:

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_PLAINTEXT:
  sasl.mechanism==GSSAPI:
  sasl.jaas.config==com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true
    storeKey=true doNotPrompt=true serviceName="kafka" client=true keyTab="/etc/krb5.keytab"
    principal="striim@REALM.COM";:
  sasl.kerberos.service.name==kafka'

Using Kafka SSL encryption without SASL (Kerberos) authentication

Caution

To avoid having passwords exposed as cleartext in Striim log messages, encrypt the KafkaConfig value using a property variable (see CREATE PROPERTYVARIABLE) or vault entry (see Using vaults).Using vaults

To use SSL encryption without SASL authentication, include the following properties in your Kafka stream property set or KafkaReader or KafkaWriter KafkaConfig, adjusting the paths to match your environment and using the passwords provided by your Kafka administrator.

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_SSL:
  ssl.truststore.location==/opt/striim/kafka.truststore.jks:
  ssl.truststore.password==secret:
  ssl.keystore.location==/opt/striim/kafka.keystore.jks:
  ssl.keystore.password==secret:
  ssl.key.password==secret'

Using Kafka without SASL (Kerberos) authentication or SSL encryption

To use neither SASL authentication nor SSL encryption, do not specify security.protocol in the KafkaReader or KafkaWriter KafkaConfig.

Reading a Kafka stream with KafkaReader

For an overview of this feature, see Introducing Kafka streams.

Reading from a Kafka stream can be useful for development tasks such as doing A/B comparisons of variations on a TQL application. If you modified Samples.PosApp.tql to persist PosDataStream to Kafka, the following source would read the persisted data from Kafka.

CREATE SOURCE AccessLogSource USING KafkaReader VERSION 0.11.0(
  brokerAddress:'localhost:9998',
  Topic:'Samples_PosDataStream',
  PartitionIDList:'0',
  startOffset:0
)
PARSE USING StriimParser ()
OUTPUT TO KafkaDSVStream;

For more information, see Kafka.Kafka Reader