TQL examples
Wildcard topic mapping
Auto Topic creation, Primary Key as Message key, Message Key as Partition key, Custom message Header, Wildcard multi topic mapping, Confluent Serializer
CREATE OR REPLACE APPLICATION DBToKafka;
CREATE OR REPLACE SOURCE DB_Source USING Global.MysqlReader (
DatabaseProviderType: 'Default',
FetchSize: 100,
adapterName: 'MysqlReader',
QuiesceOnILCompletion: false,
ConnectionURL: 'jdbc:mysql://localhost:3306/waction',
Tables: 'waction.%;',
Password: 'w@ct10n',
Username: 'root' )
OUTPUT TO DB_Data;
CREATE OR REPLACE TARGET KTWWildcardTopic USING Global.KafkaWriter (
CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}',
adapterName: 'KafkaMultiTopicWriter',
Serializer: 'ConfluentSerializer',
PartitionKey:UseMessageKey
MessageHeader: 'TableName=@MetaData(TableName)',
TopicKey: '@MetaData(TableName)',
AutoCreateTopic: true,
CDDLAction: 'Process',
E1P: true,
ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
Topics: '%,%',
ConnectionProfileName: 'admin.KafkaCp,
CommitPolicy: 'EventCount=10000;Interval=15s',
MessageKey: 'PrimaryKey',
CustomMessageKey: '' )
FORMAT USING Global.AvroFormatter (
SchemaRegistryConnectionProfileName: 'admin.confluentSchemaRegistryCP',
useSchemaRegistryConnectionProfile: 'true',
handler: 'com.webaction.proc.AvroFormatter',
SchemaRegistrySubjectName: '',
formatterName: 'AvroFormatter',
formatAs: 'Table' )
INPUT FROM DB_Data;
END APPLICATION DBToKafka;Explicit topic mapping, multi-partition
CREATE OR REPLACE SOURCE DB_Source USING Global.DatabaseReader (
Tables: 'waction.Bank;waction.Person;waction.retailData;',
DatabaseProviderType: 'Default',
FetchSize: 100,
adapterName: 'DatabaseReader',
QuiesceOnILCompletion: false,
ConnectionURL: 'jdbc:mysql://localhost:3306/waction',
Password_encrypted: 'true',
Password: 'B1DOvnx4vVGm7fflSQWDxw==',
Username: 'root' )
OUTPUT TO DB_Data;
CREATE CQ UserdataAdder
INSERT INTO Modified_DB_Data
SELECT putUserData(x,'JOB',data[2],'MARITAL',data[3]) FROM DB_Data x;;
CREATE OR REPLACE TARGET KTWSingleTopic USING Global.KafkaWriter (
CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}',
E1P: false,
adapterName: 'KafkaMultiTopicWriter',
Serializer: 'ConfluentSerializer',
MessageHeader: 'TableName=@MetaData(TableName)',
TopicKey: '@MetaData(TableName)',
AutoCreateTopic: true,
CDDLAction: 'Process',
DataTopicConfig: '{"PartitionCount":"3","ReplicationFactor":"4"}',
ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
ConnectionProfileName: 'admin.KafkaCp,
CommitPolicy: 'EventCount=10000;Interval=15s',
PartitionKey:Custom,
CustomPartitionKey: '@Metadata(TableName)',
MessageKey: 'PrimaryKey',
Topics: 'waction.Bank,Bank;waction.Person,Person;waction.reatailData,retailData',
CustomMessageKey: '' )
FORMAT USING Global.DSVFormatter (
quotecharacter: '\"',
handler: 'com.webaction.proc.DSVFormatter',
columndelimiter: ',',
formatterName: 'DSVFormatter',
nullvalue: 'NULL',
usequotes: 'false',
rowdelimiter: '\n',
standard: 'none',
header: 'false' )
INPUT FROM Modified_DB_Data;
END APPLICATION DBToKafka;Single topic mapping
CREATE OR REPLACE APPLICATION FileToKafka;
CREATE OR REPLACE SOURCE FileSource USING Global.FileReader (
adapterName: 'FileReader',
rolloverstyle: 'Default',
directory: '/Users/hareeswaran/Documents/CSVData',
blocksize: 64,
wildcard: 'bank.csv',
skipbom: true,
includesubdirectories: false,
positionbyeof: false )
PARSE USING Global.DSVParser (
trimwhitespace: false,
linenumber: '-1',
columndelimiter: ',',
columndelimittill: '-1',
trimquote: true,
ignoreemptycolumn: false,
separator: ':',
parserName: 'DSVParser',
quoteset: '\"',
handler: 'com.webaction.proc.DSVParser_1_0',
charset: 'UTF-8',
ignoremultiplerecordbegin: 'true',
ignorerowdelimiterinquote: false,
blockascompleterecord: false,
rowdelimiter: '\n',
nocolumndelimiter: false,
headerlineno: 0,
header: true )
OUTPUT TO FileData;
CREATE OR REPLACE TARGET KTWSingleTopic USING Global.KafkaWriter (
CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}',
MessageKey: 'None',
AutoCreateTopic: true,
CDDLAction: 'Process',
E1P: true,
ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
Topics: 'Employee',
TopicKey: '',
ConnectionProfileName: 'admin.KafkaCp’,
Serializer: 'StriimSerializer',
CommitPolicy: 'EventCount=10000;Interval=15s',
CustomMessageKey: '' )
FORMAT USING Global.DSVFormatter (
quotecharacter: '\"',
handler: 'com.webaction.proc.DSVFormatter',
columndelimiter: ',',
formatterName: 'DSVFormatter',
nullvalue: 'NULL',
usequotes: 'false',
rowdelimiter: '\n',
standard: 'none',
header: 'false' )
INPUT FROM FileData;
END APPLICATION FileToKafka;Explicit topic mapping
When only a single source topic has to be mapped to target topic
CREATE OR REPLACE APPLICATION DBToKafka;
CREATE OR REPLACE SOURCE DB_Source USING Global.DatabaseReader (
Tables: 'waction.Bank;waction.Person;waction.retailData;',
DatabaseProviderType: 'Default',
FetchSize: 100,
adapterName: 'DatabaseReader',
QuiesceOnILCompletion: false,
ConnectionURL: 'jdbc:mysql://localhost:3306/waction',
Password_encrypted: 'true',
Password: 'B1DOvnx4vVGm7fflSQWDxw==',
Username: 'root' )
OUTPUT TO DB_Data;
CREATE OR REPLACE TARGET KTWExplicitTopic USING Global.KafkaWriter (
CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}',
adapterName: 'KafkaMultiTopicWriter',
MessageKey: 'Custom',
TopicKey: '@MetaData(TableName)',
AutoCreateTopic: true,
CDDLAction: 'Process',
DataTopicConfig: '{"PartitionCount":"1","ReplicationFactor":"3"}',
E1P: true,
ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
CustomMessageKey: 'OperationName=@MetaData(OperationName)',
ConnectionProfileName: 'admin.KafkaCp,
Serializer: 'StriimSerializer',
CommitPolicy: 'EventCount=10000;Interval=15s',
PartitionKey: 'None',
MessageHeader: 'CompanyName="Striim"',
Topics: 'waction.Bank,Bank;waction.Person,Person;waction.reatailData,retailData' )
FORMAT USING Global.AvroFormatter (
SchemaRegistryConnectionProfileName: 'admin.confluentSchemaRegistryCP',
useSchemaRegistryConnectionProfile: 'true',
formatAs: 'Native',
SchemaRegistrySubjectName: 'UseTopicName' )
INPUT FROM DB_Data;
END APPLICATION DBToKafka;