Skip to main content

Kafka Writer upgrade guide

This section covers things you need to know if you have Kafka Writer applications in Striim 5.2 or earlier and are upgrading or have upgraded to Striim 5.4.

Notable changes to Kafka Writer when upgrading to Striim 5.4

The new Kafka Writer mandates the use of Kafka Connection Profile.

Existing Kafka Writers from the previous version will be seamlessly upgraded to the new Kafka Writer. The connection, producer, and consumer configurations defined in the Kafka Config property will be parsed during the upgrade and migrated to the Kafka Connection Profile. Authentication settings will be automatically detected and migrated to the Kafka Connection Profile.

Kafka Connection Profiles will be created and linked to each Kafka Writer instance, with every Kafka Writer receiving its own dedicated Kafka Connection Profile.

Backward Compatibility

When Kafka Writer applications from Striim 5.2 or earlier are upgraded to 5.4, the following will continue to work as before:

  • StriimMetadata (with a null value) will be added for messages when E1P is set to true (reflecting the old KafkaWriter Sync behaviour)

  • Message Key will be JSON Strings

  • Avro schemas will be created based Striim Type for both DML and DDL changes

  • Special characters will be replaced (avro schemas and subject name)

If you choose to migrate to the new KafkaWriter behavior, set the property “PreserveBackwardCompatibility” to FALSE.

If you import TQL exported from Striim 5.2 or earlier, Kafka Reader and Kafka Writer will use the deprecated version Kafka 2.1 versions of the adapters. To use the new versions of the adapters, edit the TQL before importing to use the new properties and create a connection profile with the appropriate properties.

While the legacy Kafka Writer 2.1 (using Kafka client 2.1) can technically connect to Kafka 4.0 brokers, it has significant limitations:

  • Cannot use new consumer group protocol (KIP-848)

  • Cannot access Kafka 4.0+ features

  • Operates at minimum supported protocol versions

  • Potential performance degradation

If your Kafka brokers are running version 4.0 or newer, you must use the new Kafka Writer. Apache Kafka 4.0 removed support for client protocol versions older than 2.1, making the legacy Kafka Writer 2.1 incompatible with Kafka 4.0+ brokers.

Enhanced Features

Enhanced features will be available with

  • Schema compatibility will be NONE by default but can be altered.

  • Subject Name mapping will be default, but you may choose “UseTopicName” or “UseCustomValue” for WAEvents with all types “formatAs” in AvroFormatter.

TQL examples before and after upgrade

After upgrading to Striim 5.4, the following TQL from Striim 5.2:

CREATE OR REPLACE TARGET KAFKA_MTLS_AVRO USING Global.KafkaWriter VERSION '2.1.0'( 
  KafkaConfigValueSeparator: '==', 
  KafkaConfigPropertySeparator: ':', 
  adapterName: 'KafkaWriter', 
  MessageKey: 'PatientHistoryFrom=@metadata(TableName)', 
  KafkaConfig: 'request.timeout.ms==60001:session.timeout.ms==60000:security.protocol==SSL:ssl.truststore.location==/Users/sanjay/confluent/cloud/mtls/client.truststore.jks:ssl.truststore.password==sanjay:ssl.keystore.location==/Users/sanjay/confluent/cloud/mtls/client.keystore.p12:ssl.keystore.password==sanjay:ssl.key.password==sanjay:ssl.keystore.type==PKCS12:value.serializer==io.confluent.kafka.serializers.KafkaAvroSerializer', 
  MessageHeader: 'HospitalName=\"UniversalHealthCare\"', 
  Mode: 'ASync', 
  Topic: 'PATIENT_RECORDS_AVRO', 
  brokerAddress: 'pkc-nyn9x6.us-east1.gcp.confluent.cloud:9092' 
) 
FORMAT USING Global.AvroFormatter  ( 
  handler: 'com.webaction.proc.AvroFormatter', 
  formatterName: 'AvroFormatter', 
  formatAs: 'Native', 
  schemaregistryurl: 'https://psrc-1ry6ve7.us-east1.gcp.confluent.cloud', 
  schemaregistryConfiguration: 'basic.auth.user.info=RKPN7PIW3I6XHODM:cflt2xAwg6Ro7HtOGz2bwv/ANiBLLqqgfRiEF75tVfIwp347VdGO6pYOpEQuNUgQ,basic.auth.credentials.source=USER_INFO' 
) 
INPUT FROM ORACLE_OP_STREAM;

Will be converted to:

CREATE OR REPLACE TARGET KAFKA_MTLS_AVRO USING Global.KafkaWriter ( 
  adapterName: 'KafkaWriter', 
  CustomMessageKey: 'PatientHistoryFrom=@metadata(TableName)', 
  MessageKey: 'Custom',
  MessageHeader: 'HospitalName=\"UniversalHealthCare\"', 
  E1P: false, 
  Topics: 'PATIENT_RECORDS_AVRO',
  PartitionKey: 'None',
  Serializer: 'ConfluentSerializer',
  connectionProfileName: 'admin.KAFKA_MTLS_AVRO_CP',
  PreserveBackwardCompatibility: true
) 
FORMAT USING Global.AvroFormatter  ( 
  handler: 'com.webaction.proc.AvroFormatter', 
  formatterName: 'AvroFormatter', 
  formatAs: 'Native', 
  schemaregistryurl: 'https://psrc-1ry6ve7.us-east1.gcp.confluent.cloud', 
  schemaregistryConfiguration: 'basic.auth.user.info=RKPN7PIW3I6XHODM:cflt2xAwg6Ro7HtOGz2bwv/ANiBLLqqgfRiEF75tVfIwp347VdGO6pYOpEQuNUgQ,basic.auth.credentials.source=USER_INFO' 
) 
INPUT FROM ORACLE_OP_STREAM;

And the connection-related properties will be moved to a new connection profile:

Kafka_converted_connection_profile.png

KafkaWriter changes in 5.4

  • The new Kafka Writer gets all the connection information from the connection profile, which can extract information from the vault and also support rotation of keys and certificates while the application is still running. Any required producer configuration can be passed via the Producer Config in the connection profile.

  • Supports “Topics” property similar to “Topic” property can accept the target topic name into which all the data has to be produced. Additionally it can also take a source entity name (for example tables for OLTP sources, collection for NOSQL sourceS) to target topic mapping.

  • Topics can be created by the target automatically if configured (supports the default special characters supported by Kafka).

  • MessageKeys can be custom or Primary Keys (supported if incoming events are from OLTP sources).

  • Incoming events payload can be distributed to multiple partitions within a topic based custom partition key or Message Key.

  • Message Semantics is decided based on the “E1P” toggle, which by default gives exactly once processing.

    • To ensure E1P, KafkaWriter needs an additional checkpointing topic (can be created by the user or give required permissions for the target to auto create the topics).

  • We still support both Striim and Confluent format, which can be configured via the “Serializer” property.

Property Comparison

Property name - Kafka Writer 2.1

Property name - KafkaWriter

PreserveBackwardCompatability - True

Mode : Sync (with recovery ON)

E1P : True (with recovery ON),

Mode : Async

E1P : False (reflect the recovery setting)

Topic

Topics

BrokerAddress, KafkaConfig

ConnectionProfile (Endpoint Type - Kafka)

MessageKey - Empty

MessageKey : NONE

MessageKey

MessageKey: Custom, CustomMessageKey

MessageHeader

MessageHeader

Partition Key - Empty

Partition Key : NONE

Partition Key

Partition Key : Custom, CustomPartitionKey

Parallel Threads

Parallel Threads

Value.serializer : Confluent (specified via KafkaConfig)

Serialiser : Confluent

Note : Async Producer API is used with E1P True or False with respective producer configuration to ensure the message semantics.

Avro Formatter changes in 5.4

  • Message key serialization with Avro Formatter will be marked as the default formatter for Kafka Writer. The schema generation and evolution will be tracked via Schema Registry only. “SchemaFileName” is not supported for KafkaWriter.

  • Schema Compatibility setting can be configured via the Avroformatter.

  • Subject naming: in Kafka Writer version 2.1.0 with the Avro formatter, subject name mapping was supported only for AvroEvent and JsonNode event types. After the upgrade, “Use Topic Name”, “Use Dynamic Values” will also be supported for WAEvent in the formatAs Native, Table, and Default.

  • Schema tracking is supported only via Schema registry.

Message Structure changes

  • Message Key Format - As part of the Kafka message payload, message Keys are not simple JSON strings anymore, instead they are serialised respective to the formatter. In the case of Avro formatter message keys will be, avro records maintaining the original data types and serialised as per "Serializer" mentioned. Separate schemas are also registered for Message Keys with subject names based on “SubjectNameMapping” (now also supported for WAEvents).

    However, to maintain backward compatibility new Kafka Writer (if upgraded from older version) will still retain the MessageKey as JSON Strings for all formatters (no key schemas will be registered).

  • Striimmetadata - In older Kafka Writer versions 0.11 and 2.1.0, StriimMetadata was included with the data when sync mode was enabled. This was done to support exactly once processing messaging semantics. In the new Kafka Writer, this overhead is eliminated. Instead, a checkpointing topic will be created (requiring topic creation permissions) if it hasn’t been pre-created by the user.

    However, to maintain backward compatibility new Kafka Writer (if upgraded from older version) will still retain the StriimMetadata field (respective to the formatter) — but the value will always be null.

Avro Serialization Changes

  • Enhanced Datatype Mapping - In Kafka Writer version 2.1.0, when using the Avro formatter with Format As: Native or Table, the names and types of fields in the “Data” / “Before” nested records are mapped based on the Striim TYPE created by the sources for the respective tables. For Kafka Writer, names of the columns and datatypes mapped will be based on the Schema conversion tool if the source schema flows via the pipeline or can depend on the respective user generated schema.

  • Schema Compatibility - In the older Kafka Writer with the Avro formatter, the schema compatibility was not supported. In the new Kafka Writer, the default remains None, but users have flexibility to configure other compatibility modes as needed which will apply for all schemas registered by the target.

  • Aliases - In Kafka Writer versions 2.1.0 and 0.11.0, when using the Avro formatter, field names will be represented in uppercase, and their actual names will be stored as aliases. In new Kafka Writer aliases are no longer maintained, and the source field names are directly mapped to as the Avro field names.

To maintain backward compatibility new Kafka Writer (if upgraded from older version) will create avro records based on Strim Types, Schema compatibility will be NONE and aliases will be added. The users can move to the new behaviour if all the consuming applications are upgraded to handle the new schema change.

Handling Special Characters

  • Kafka topics created via KafkaWriter will support dot (.), underscore (_), and hyphen (-)

  • Schema registered via Avro Formatter will support

    • Subject name

    • Underscores (_), and dots (.) in the namespace

    • Underscore (_) in the Avro record

    • Underscore (_) in the Avro field name.

In the older Kafka Writer, special characters in the subject name were converted to a mangled name format (_<Name of special character>_). In the new Kafka Writer, the application will halt if any unsupported special characters are detected during schema registration. However, to maintain backward compatibility new Kafka Writer (only if upgraded from older version) will stick to the mangled naming convention.

Resource consumption changes

Supports exactly once processing using Kafka Transactions capability along with Striim’s proprietary checkpointing.

With Striim v5.2.0, KafkaWriter, exactly once processing comes with the following additional overhead.

  • Adding Striim’s metadata into the messages written to data topics,

  • Additional resources to maintain exactly once processing and restarted delays,

  • Dependency on Sync producer,

  • Batching multiple events into a single Kafka Message (this required extra logic on the consumer side to expect more than one event from a Kafka message).

Above overhead will not be seen in this new Kafka Writer. A checkpointing topic will be created (will need topic creation permissions), if it's not pre-created by the users.