Skip to main content

Kafka Reader

Reads data from Apache Kafka 0.11, 2.1, or 3.3.2. Support for Kafka 0.8, 0.9, and 0.10 is deprecated.

For information on configuring Kafka streams, see Reading a Kafka stream with KafkaReader and Persisting a stream to Kafka.

See Supported reader-parser combinations) for parsing options.

Kafka Reader properties

property

type

default value

notes

Auto Map Partition

Boolean

True

When reading from multiple partitions, if there are multiple servers in the Striim deployment group on which KafkaReader is deployed ON ALL, partitions will be distributed automatically among the servers. Partitions will be rebalanced automatically as Striim servers are added to or removed from the group.

When deploying on a Forwarding Agent, set to False.

Broker Address

String

Ignorable Exception

String

Available only in Kafka 0.11 and later versions.

You can specify a list of exceptions that Striim should ignore so as to avoid a system halt.

Currently, the only supported exception is "OFFSET_MISSING_EXCEPTION" which allows you to avoid a system halt when there is a partition purge. A partition purge is Kafka's process of freeing up Kafka message logs after the retention period expires. Messages have a TTL (time to live) with retention period properties in place. Upon expiry, messages are marked for deletion according to their creation timestamp. Once a message/offset is marked for deletion, and the consumer tries to read from the purged offset, Kafka throws an "OFFSET_MISSING_EXCEPTION".

Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE).

Kafka Config

String

Specify any properties required by the authentication method used by the specified Kafka broker (see Configuring authentication in Kafka Config.

Optionally, specify Kafka consumer properties, separated by semicolons.

When reading from a topic in Confluent Cloud, specify the appropriate SASL properties.

When messages are in Confluent wire format, specify value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer.

When each message contains one Avro record and it is not length-delimited, set value.deserializer= com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer.

The default is:

max.partition.fetch.bytes=10485760;
fetch.min.bytes=1048576;
fetch.max.wait.ms=1000;
receive.buffer.bytes=2000000;
poll.timeout.ms=10000

Kafka Config Property Separator

String

;

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains a semicolon.

Kafka Config Value Separator

String

=

Available only in Kafka 0.11 and later versions. Specify a different separator if one of the producer property values specified in KafkaConfig contains an equal symbol.

Partition ID List

String

Partition numbers to read from, separated by semicolons (for example, 0;1), or leave blank to read from all partitions

Start Offset

Long

-1

With default value of -1, reads from the end of the partition. Change to 0 to read from the beginning of the partition.

If you specify startOffset, leave startTimestamp at its default value.

You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated offset. For example:

StartOffset: '0=1; 1=0; 2=1024' , where 0, 1, and 2 represent partition numbers, and the values represented the associated offsets.

Start Timestamp

String

For KafkaReader 0.10 and later only:

If not specified, only new transactions (based on current Kafka host system time) are read. Specify a value in the format yyyy-MM-dd hh:mm:ss:SSS (for example, 2017-10-20 13:55:55.000) to start reading from an earlier point.

If the Kafka host and Striim host are not in the same time zone, specify the start time using the Striim host's time zone.

If you specify startTimestamp, leave startOffset at its default value.

You can also specify multiple partition values for the Start Offset and Start Timestamp properties. Specify these values as a list of key-value pairs in the format "key=value; key=value". The keys and values are a list of partition IDs with an associated timestamp. For example:

StartTimestamp: '0=<TS_0>;1=<TS_2>;2=<TS_3>" , where 0, 1, and 2 represent partition numbers, and the values represented the associated start timestamps.

Topic

String

Configure Kafka Consumer Properties for Kafka Reader and Kafka Writer

Specify the Kafka version for the broker being read using VERSION '0.#.0': for example, to read from a Kafka 2.1 or 3.3 cluster, the syntax is, CREATE SOURCE <name> USING KafkaReader VERSION '2.1.0'.

The output type is WAevent except when using Avro Parser  or JSONParser.

Configure Kafka for a topic in Confluent Cloud

When reading or writing from a topic in Confluent Cloud, you specify the appropriate SASL properties. Kafka Writer uses Confluent’s Avro serializer which registers the schema in the Confluent Cloud schema registry and adds the schema registry ID with the respective Avro records in Kafka messages. You specify the Confluent Cloud configuration properties (server URL, SASL config, schema registry URL, and schema registry authentication credentials) as a part of the KafkaConfig properties.

For example:

KafkaConfig: 'max.request.size==10485760:
      batch.size==10000120:
      sasl.mechanism==PLAIN:
      schema.registry.url==https://example.us-central1.gcp.confluent.cloud:
      security.protocol==SASL_SSL:sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule 
      required username=\"<kafka-cluster-api-key>\" 
      password=\"<kafka-cluster-api-secret>\";'

When messages are in Confluent wire format, specify value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer.

Sample application

The following sample application will read the data written to Kafka by the Kafka Writer sample application and write it to striim/KR11Output.00:

CREATE SOURCE KR11Sample USING KafkaReader VERSION '0.11.0'(
  brokerAddress:'localhost:9092',
  topic:'KafkaWriterSample',
  startOffset:'0'
)
PARSE USING DSVParser ()
OUTPUT TO RawKafkaStream;

CREATE TARGET KR11Out USING FileWriter (
  filename:'KR11Output'
)
FORMAT USING DSVFormatter ()
INPUT FROM RawKafkaStream;

Configuring authentication in Kafka Config

For information on configuring Kafka streams, see Reading a Kafka stream with KafkaReader and Persisting a stream to Kafka.

Use Kafka SASL (Kerberos) authentication with SSL encryption

To use SASL authentication with SSL encryption, include the following properties in your Kafka Reader or Kafka Writer KafkaConfig, adjusting the paths to match your environment and using the passwords provided by your Kafka administrator.

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_SSL:
  sasl.mechanism==GSSAPI:
  sasl.jaas.config==com.sun.security.auth.module.Krb5LoginModule  required useKeyTab=true storeKey=true doNotPrompt=true serviceName="kafka" client=true keyTab="/etc/krb5.keytab" principal="striim@REALM.COM";:
  sasl.kerberos.service.name==kafka:
  ssl.truststore.location==/opt/striim/kafka.truststore.jks:
  ssl.truststore.password==secret:
  ssl.keystore.location==/opt/striim/kafka.keystore.jks:
  ssl.keystore.password==secret:ssl.key.password==secret'

Use Kafka SASL (Kerberos) authentication without SSL encryption

To use SASL authentication without SSL encryption, include the following properties in your Kafka Reader or Kafka Writer KafkaConfig

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_PLAINTEXT:
  sasl.mechanism==GSSAPI:
  sasl.jaas.config==com.sun.security.auth.module.Krb5LoginModule  required useKeyTab=true storeKey=true doNotPrompt=true serviceName="kafka" client=true keyTab="/etc/krb5.keytab" principal="striim@REALM.COM";:
  sasl.kerberos.service.name==kafka'

Using Kafka SSL encryption without SASL (Kerberos) authentication

To use SSL encryption without SASL authentication, include the following properties in your Kafka stream property set or KafkaReader or KafkaWriter KafkaConfig, adjusting the paths to match your environment and using the passwords provided by your Kafka administrator.

KafkaConfigPropertySeparator: ':',
KafkaConfigValueSeparator: '==',
KafkaConfig:'security.protocol==SASL_SSL:              
  ssl.truststore.location==/opt/striim/kafka.truststore.jks:
  ssl.truststore.password==secret:
  ssl.keystore.location==/opt/striim/kafka.keystore.jks:
  ssl.keystore.password==secret:
  ssl.key.password==secret'

Using Kafka without SASL (Kerberos) authentication or SSL encryption

To use neither SASL authentication nor SSL encryption, do not specify security.protocol in the KafkaReader or KafkaWriter KafkaConfig.

Reading a Kafka stream with KafkaReader

For an overview of this feature, see Introducing Kafka streams.

Reading from a Kafka stream can be useful for development tasks such as doing A/B comparisons of variations on a TQL application. If you modified Samples.PosApp.tql to persist PosDataStream to Kafka, the following source would read the persisted data from Kafka.

CREATE SOURCE AccessLogSource USING KafkaReader VERSION 0.11.0(
  brokerAddress:'localhost:9998',
  Topic:'Samples_PosDataStream',
  PartitionIDList:'0',
  startOffset:0
)
PARSE USING StriimParser ()
OUTPUT TO KafkaDSVStream;

For more information, see Kafka Reader.Kafka Reader