Programmer’s reference
Kafka Writer properties
property | type | default value | notes |
|---|---|---|---|
Avro Record Namespace | String | See Avro Formatter. | |
Auto Create Topic | Boolean | True | With the default value of True, when a topic with the expected name does not exist, Kafka Writer will create it automatically.
|
Checkpoint Topic | String | Appears in UI when Exactly Once Processing is True. Name of the checkpointing topic. By default the name of the checkpoint topic will be It is mandatory to have log.cleanup.policy set to compact, the retention time set high enough to accommodate the maximumdowntime of the Striim application, and replication factor greater than 1. If the checkpoint topic does not exist, it will be created automatically, provided topic creation is enabled. | |
Checkpoint Topic Config | String | "{"PartitionCount":1, "ReplicationFactor": 3, "CleanUpPolicy":"compact", "min.cleanable.dirty.ratio":"0.5", "segment.ms":"86400000", "segment.bytes":"1073741824", "min.compaction.lag.ms":"3600000", "max.compaction.lag.ms":"604800000"}" | Appears in UI when Exactly Once Processing is True. By default the checkpoint topic will be created with a single partition, replication factor of 3, and log cleanup policy of compact. You may override the settings if required with the following considerations:
|
Connection Profile Name | Enum | Specify the name of the Kafka connection profile to use, or select New Connection Profile to create one. For mroe information, see Connection profiles. | |
Connection Retry | String | initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 10 seconds ( The adapter will halt when either
If To disable connection retry, set Negative values are not supported. |
Custom Message Key | Property-value pairs | Appears in UI when Message Key is Custom. Specify one or more key-value pairsto be sent as the Kafka message key. The value can be a static string or a field from the incoming stream. The format of the property must be | |
Custom Partition Key | String | Appears in UI when Partition Key is Custom. Specify the name of a field from an input stream of a user-defined type, or a | |
Data Topic Config | When Auto Create Topic is True, optionally specify Kafka properties to be used when creating topics, for example, | ||
Exactly Once Processing | Boolean | True | See E1P (exactly-once processing) and A1P (at-least-once processing). |
Format As | Boolean | Default | See Avro Formatter and Schema registry use cases and examples. |
Formatter | Enum | AvroFormatter | This adapter has a choice of formatters. See Formatters for more information. |
Message Header | String | A property that defines how Kafka message headers are constructed for each event. If left blank, no additional headers are added. Optionally, specify one or more key-value pairs, where each value can be a static string or a field from the input stream, for example, | |
Message Key | String | None | With the default value of None, no message key will be sent. Set to Custom to create the message keys based on one or more fields from the input stream. Set to Primary Key to create the message keys based on the values of the incoming event's primary key fields. This is applicable only for compatible database and data warehouse sources. For more information, see Message Key use cases and examples. |
Parallel Threads | Integer | ||
Partition Key | Enum | None | With the default value of None, no explicit partition key is set. Set to Custom to define a partition key based on a field from the input stream. Set to Message Key to define partition keys based on the message keys. For more information, see Partition Key use cases and examples. |
Persist Schema | Enum | ON | Appears in UI when Schema Evolution is Auto. With the default value of ON, DDL events will be will be written to the target as Kafka messages. Set to OFF if you do not want those messages to be written to the target. |
Schema Evolution | Enum | Auto | With the default value of Auto, the associated schema registry will be evolved with supported DDL events. (This is equivalent to CDDL Action = Process in other adapters.) Set to Manual to halt the application when a DDL event is encountered. (This is equivalent to CDDL Action = Halt in other adapters.) For more information, see Handling schema evolution. |
Schema File Name | String | Not used by Kafka Writer. | |
Schema Registry Compatibility | Enum | See Avro Formatter and Schema registry use cases and examples. | |
Schema Registry Configuration | String of key-value pairs | See Avro Formatter and Schema registry use cases and examples. | |
Schema Registry Subject Name | See Avro Formatter and Schema registry use cases and examples. | ||
Schema Registry URL | String | See Avro Formatter and Schema registry use cases and examples. | |
Serializer | Enum | Striim Serializer | If the Formatter is AvroFormatter and the downstream application expects Kafka messages written in Confluent wire format, select Confluent Serializer. Otherwise, leave set to default. For more information, see Serializer use cases and examples. |
Topic Key | If a wildcard or more than one topic is specified in the Topics property, specify the name of a field in the input stream to be used to distribute events among topics. For more information, see Topics and Topic Key use cases and examples. | ||
Topics | String | Defines the names of the topic(s) to be written to. For more information, see Topics and Topic Key use cases and examples. | |
Use Schema Registry Connection Profile | Boolean | False | Set to True to use a connection profile for the schema registry connection properties. |
Topics and Topic Key use cases and examples
The following examples show how Topics and Topic Key would be specified for various use cases.
Wildcards: target topics to be created with the source table / entity names
Topic Key:
@metadata(TableName)Topics:
%,%
Target topic names will be created with the exact value present in
@metadata(TableName), for example,src.EMP,src.DEPT, etc. If a source event to target topic mapping is not found, the event will be discarded.Alternatively, Topics might be specified as
src.%,%or%,striim_%.Explicit Mapping: When mapped to multiple topics the Topic Key property determines the name of a field in the input stream or Metadata or Userdata key field of the incoming event whose values determine how events are distributed among multiple topics. If the value matches with the Topics”mapping then the event is an interested event and will be processed further, otherwise it will be discarded.
For example, CDC records from Oracle tables to be written to individual kafka topics such as employee-topic, dept-topic, customer-topic, etc.:
Topic Key:
@metadata(TableName)Topics:
EMP, employee-topic; src.DEPT, dept-topic; src.CUSTOMER,customer-topic
MQTT Reader is sending IOT data from multiple devices such as refrigerators, ovens, washing machines, and you want to write these device specific events into individual topics:
Topic Key:
devicetypeTopics:
%,%
Single topic:
Topics:
mytopic
Leave Topic Key blank.
Message Header use cases and examples
By default the Messages will be published without a Message header.
Various available options:
Users can define the Message header to have dynamic values for each Kafka Message by referencing fields from the payload, metadata, or user data.
Users can add a static Message header to be added to each Kafka message.
If multiple message headers are required, use the respective UI widget to add the values or provided as a semicolon-separated value via TQL.
Events Supported :
Applies to all incoming event types (JSONNodeEvent, WAEvent, AvroEvent, TypedEvent) and formatters (DSV, XML, AVRO and JSON formatters).
When a Message header is specified,
The data is stored in Kafka headers as string-based key-value pairs.
Category | Property configuration | Kafka message header |
|---|---|---|
Static value | MessageHeader : CName=”StriimCluster” | [
{
"key": "CName",
"value": "StriimCluster"
}
] |
Dyamic value - WAEvent - OLTP source | MessageHeader : Table=@metadata(TableName); Operation=@metadata(OperationName); CompanyName=@userdata(companyName) | [
{
"key": "Table",
"value": "Sch.EMP"
},
{
"key": "Operation",
"value": "INSERT"
},
{
"key": "CompanyName",
"value": "MyCompany"
}
]
|
Dynamic value- file-based source | MessageHeader : File=@metadata(FileName); CompanyName=@userdata(companyName) | {
"File":"UserLogs",
"CompanyName":"MyCompany"
} |
Dynamic value - event of user-defined type - sales transaction | MessageHeader : CityName=City; State=state | {
"CityName":"Chennai",
"State":"TamilNadu"
} |
Mix of static and dynamic values | CName=”StriimCluster”; Table=@metadata(TableName); Operation=@metadata(OperationName) | {
"CName":"StriimCluster",
"Table":"Sch.EMP",
"Operation":"INSERT"
} |