Skip to main content

Programmer’s reference

Kafka Writer properties

property

type

default value

notes

Avro Record Namespace

String

See Avro Formatter.

Auto Create Topic

Boolean

True

With the default value of True, when a topic with the expected name does not exist, Kafka Writer will create it automatically.

  • If Topics is a wildcard, the names will be defined by the Topic Key.

  • If Topics is a user-defined list, the names will be taken from the list.

Checkpoint Topic

String

Appears in UI when Exactly Once Processing is True.

Name of the checkpointing topic. By default the name of the checkpoint topic will be <writer name>_CHECKPOINT. You may edit this property to change the name of the checkpointing topic. The checkpoint topic must be unique for each writer.

It is mandatory to have log.cleanup.policy set to compact, the retention time set high enough to accommodate the maximumdowntime of the Striim application, and replication factor greater than 1.

If the checkpoint topic does not exist, it will be created automatically, provided topic creation is enabled.

Checkpoint Topic Config

String

"{"PartitionCount":1, "ReplicationFactor": 3, "CleanUpPolicy":"compact", "min.cleanable.dirty.ratio":"0.5", "segment.ms":"86400000", "segment.bytes":"1073741824", "min.compaction.lag.ms":"3600000", "max.compaction.lag.ms":"604800000"}"

Appears in UI when Exactly Once Processing is True.

By default the checkpoint topic will be created with a single partition, replication factor of 3, and log cleanup policy of compact. You may override the settings if required with the following considerations:

  • Replication factor: set to a higher value for higher availability.

  • Cleanup policy: do not modify unless instructed to by Striim Support.

  • If Parallel Threads exceeds the PartitionCount value, the partition count will be automatically adjusted to match the Parallel Threads value.

  • If Partition Count and Replication Factor are not specified, the values for "num.partitions" and "default.replication.factor" from the Kafka broker configuration will be used. If these values are not available in the broker configuration, default fallback values will be applied.

Connection Profile Name

Enum

Specify the name of the Kafka connection profile to use, or select New Connection Profile to create one. For mroe information, see Connection profiles.

Connection Retry

String

initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 10 seconds (InitialRetryDelay=10s). If the second attempt is unsuccessful, in 20 seconds it will try a third time (InitialRetryDelay=10s multiplied by retryDelayMultiplier=2). If that fails, the adapter will try again in 40 seconds (the previous retry interval 20s multiplied by 2). If connection attempts continue to fail, the the adapter will try again every 60 seconds (maxRetryDelay=1m) until a total of five connection attempts have been made (maxAttempts=5), after which the adapter will halt and log an exception.

The adapter will halt when either maxAttempts or totalTimeout is reached.

InitialRetryDelay, maxRetryDelay, and totalTimeout may be specified in milliseconds (ms), seconds (s, the default), or minutes (m).

If retryDelayMultiplier is set to 1, connection will be attempted on the fixed interval set by InitialRetryDelay.

To disable connection retry, set maxAttempts=0.

Negative values are not supported.

Custom Message Key

Property-value pairs

Appears in UI when Message Key is Custom.

Specify one or more key-value pairsto be sent as the Kafka message key. The value can be a static string or a field from the incoming stream. The format of the property must be Key=Value; Key=value, for example, OperationCode=@metadata(OperationName); SourceTableName=@metadata(TableName).

Custom Partition Key

String

Appears in UI when Partition Key is Custom.

Specify the name of a field from an input stream of a user-defined type, or a metadata or userdata field in an input sttream of type AVROEvent, JSONNodeEvent, or WAEvent.

Data Topic Config

When Auto Create Topic is True, optionally specify Kafka properties to be used when creating topics, for example, '{"PartitionCount":1, "ReplicationFactor":3, "CleanUp Policy":"compact", "Retention Time":"2592000000", "Retention Size":"1048576"}'

Exactly Once Processing

Boolean

True

See E1P (exactly-once processing) and A1P (at-least-once processing).

Format As

Boolean

Default

See Avro Formatter and Schema registry use cases and examples.

Formatter

Enum

AvroFormatter

This adapter has a choice of formatters. See Formatters for more information.

Message Header

String

A property that defines how Kafka message headers are constructed for each event. If left blank, no additional headers are added. Optionally, specify one or more key-value pairs, where each value can be a static string or a field from the input stream, for example, OperationName=@metadata(OperationName); CompanyName=@userdata(companyName). For more information, see Message Header use cases and examples.

Message Key

String

None

With the default value of None, no message key will be sent.

Set to Custom to create the message keys based on one or more fields from the input stream.

Set to Primary Key to create the message keys based on the values of the incoming event's primary key fields. This is applicable only for compatible database and data warehouse sources.

For more information, see Message Key use cases and examples.

Parallel Threads

Integer

See Creating multiple writer instances (parallel threads).

Partition Key

Enum

None

With the default value of None, no explicit partition key is set.

Set to Custom to define a partition key based on a field from the input stream.

Set to Message Key to define partition keys based on the message keys.

For more information, see Partition Key use cases and examples.

Persist Schema

Enum

ON

Appears in UI when Schema Evolution is Auto.

With the default value of ON, DDL events will be will be written to the target as Kafka messages. Set to OFF if you do not want those messages to be written to the target.

Schema Evolution

Enum

Auto

With the default value of Auto, the associated schema registry will be evolved with supported DDL events. (This is equivalent to CDDL Action = Process in other adapters.) Set to Manual to halt the application when a DDL event is encountered. (This is equivalent to CDDL Action = Halt in other adapters.) For more information, see Handling schema evolution.

Schema File Name

String

Not used by Kafka Writer.

Schema Registry Compatibility

Enum

See Avro Formatter and Schema registry use cases and examples.

Schema Registry Configuration

String of key-value pairs

See Avro Formatter and Schema registry use cases and examples.

Schema Registry Subject Name

See Avro Formatter and Schema registry use cases and examples.

Schema Registry URL

String

See Avro Formatter and Schema registry use cases and examples.

Serializer

Enum

Striim Serializer

If the Formatter is AvroFormatter and the downstream application expects Kafka messages written in Confluent wire format, select Confluent Serializer. Otherwise, leave set to default. For more information, see Serializer use cases and examples.

Topic Key

If a wildcard or more than one topic is specified in the Topics property, specify the name of a field in the input stream to be used to distribute events among topics. For more information, see Topics and Topic Key use cases and examples.

Topics

String

Defines the names of the topic(s) to be written to. For more information, see Topics and Topic Key use cases and examples.

Use Schema Registry Connection Profile

Boolean

False

Set to True to use a connection profile for the schema registry connection properties.

Topics and Topic Key use cases and examples

The following examples show how Topics and Topic Key would be specified for various use cases.

  • Wildcards: target topics to be created with the source table / entity names

    • Topic Key: @metadata(TableName)

    • Topics: %,%

    Target topic names will be created with the exact value present in @metadata(TableName), for example, src.EMP, src.DEPT, etc. If a source event to target topic mapping is not found, the event will be discarded.

    Alternatively, Topics might be specified as src.%,% or %,striim_%.

  • Explicit Mapping: When mapped to multiple topics the Topic Key property determines the name of a field in the input stream or Metadata or Userdata key field of the incoming event whose values determine how events are distributed among multiple topics. If the value matches with the Topics”mapping then the event is an interested event and will be processed further, otherwise it will be discarded.

    For example, CDC records from Oracle tables to be written to individual kafka topics such as employee-topic, dept-topic, customer-topic, etc.:

    • Topic Key: @metadata(TableName)

    • Topics: EMP, employee-topic; src.DEPT, dept-topic; src.CUSTOMER,customer-topic

  • MQTT Reader is sending IOT data from multiple devices such as refrigerators, ovens, washing machines, and you want to write these device specific events into individual topics:

    • Topic Key: devicetype

    • Topics: %,%

  • Single topic:

    • Topics: mytopic

    Leave Topic Key blank.

Message Header use cases and examples

By default the Messages will be published without a Message header.

Various available options:

  • Users can define the Message header to have dynamic values for each Kafka Message by referencing fields from the payload, metadata, or user data.

  • Users can add a static Message header to be added to each Kafka message.

  • If multiple message headers are required, use the respective UI widget to add the values or provided as a semicolon-separated value via TQL.

  • Events Supported :

    • Applies to all incoming event types (JSONNodeEvent, WAEvent, AvroEvent, TypedEvent) and formatters (DSV, XML, AVRO and JSON formatters).

When a Message header is specified,

  • The data is stored in Kafka headers as string-based key-value pairs.

Category

Property configuration

Kafka message header

Static value

MessageHeader : 
CName=”StriimCluster”
[
  {
    "key": "CName",
    "value": "StriimCluster"
  }
]

Dyamic value - WAEvent - OLTP source

MessageHeader : 
Table=@metadata(TableName); 
Operation=@metadata(OperationName);
CompanyName=@userdata(companyName)
[
  {
    "key": "Table",
    "value": "Sch.EMP"
  },
 {
    "key": "Operation",
    "value": "INSERT"
  },
 {
    "key": "CompanyName",
    "value": "MyCompany"
  }
]

Dynamic value- file-based source

MessageHeader : 
File=@metadata(FileName); 
CompanyName=@userdata(companyName)
{
    "File":"UserLogs",
     "CompanyName":"MyCompany"
}

Dynamic value - event of user-defined type - sales transaction

MessageHeader : 
CityName=City; 
State=state
{
    "CityName":"Chennai",
     "State":"TamilNadu"
}

Mix of static and dynamic values

CName=”StriimCluster”;
Table=@metadata(TableName); 
Operation=@metadata(OperationName)
{
    "CName":"StriimCluster",
     "Table":"Sch.EMP",
     "Operation":"INSERT"
  
}