Skip to main content

TQL examples

Wildcard topic mapping

Auto Topic creation, Primary Key as Message key, Message Key as Partition key, Custom message Header, Wildcard multi topic mapping, Confluent Serializer

CREATE OR REPLACE APPLICATION DBToKafka;

CREATE OR REPLACE SOURCE DB_Source USING Global.MysqlReader ( 
  DatabaseProviderType: 'Default', 
  FetchSize: 100, 
  adapterName: 'MysqlReader', 
  QuiesceOnILCompletion: false, 
  ConnectionURL: 'jdbc:mysql://localhost:3306/waction',  
  Tables: 'waction.%;', 
  Password: 'w@ct10n', 
  Username: 'root' ) 
OUTPUT TO DB_Data;

CREATE OR REPLACE TARGET KTWWildcardTopic USING Global.KafkaWriter ( 
  CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}', 
  adapterName: 'KafkaMultiTopicWriter', 
  Serializer: 'ConfluentSerializer', 
  PartitionKey:UseMessageKey
  MessageHeader: 'TableName=@MetaData(TableName)', 
  TopicKey: '@MetaData(TableName)', 
  AutoCreateTopic: true, 
  CDDLAction: 'Process', 
  E1P: true, 
  ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m', 
  Topics: '%,%', 
  ConnectionProfileName: 'admin.KafkaCp, 
  CommitPolicy: 'EventCount=10000;Interval=15s', 
  MessageKey: 'PrimaryKey', 
  CustomMessageKey: '' ) 
FORMAT USING Global.AvroFormatter  ( 
  SchemaRegistryConnectionProfileName: 'admin.confluentSchemaRegistryCP', 
  useSchemaRegistryConnectionProfile: 'true', 
  handler: 'com.webaction.proc.AvroFormatter', 
  SchemaRegistrySubjectName: '', 
  formatterName: 'AvroFormatter', 
  formatAs: 'Table' ) 

INPUT FROM DB_Data;

END APPLICATION DBToKafka;

Explicit topic mapping, multi-partition

CREATE OR REPLACE SOURCE DB_Source USING Global.DatabaseReader ( 
  Tables: 'waction.Bank;waction.Person;waction.retailData;', 
  DatabaseProviderType: 'Default', 
  FetchSize: 100, 
  adapterName: 'DatabaseReader', 
  QuiesceOnILCompletion: false, 
  ConnectionURL: 'jdbc:mysql://localhost:3306/waction', 
  Password_encrypted: 'true', 
  Password: 'B1DOvnx4vVGm7fflSQWDxw==', 
  Username: 'root' ) 
OUTPUT TO DB_Data;

CREATE CQ UserdataAdder 
INSERT INTO Modified_DB_Data 
SELECT putUserData(x,'JOB',data[2],'MARITAL',data[3]) FROM DB_Data x;;

CREATE OR REPLACE TARGET KTWSingleTopic USING Global.KafkaWriter ( 
  CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}', 
  E1P: false, 
  adapterName: 'KafkaMultiTopicWriter', 
  Serializer: 'ConfluentSerializer', 
  MessageHeader: 'TableName=@MetaData(TableName)', 
  TopicKey: '@MetaData(TableName)', 
  AutoCreateTopic: true, 
  CDDLAction: 'Process', 
  DataTopicConfig: '{"PartitionCount":"3","ReplicationFactor":"4"}', 
  ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m', 
  ConnectionProfileName: 'admin.KafkaCp, 
  CommitPolicy: 'EventCount=10000;Interval=15s', 
   PartitionKey:Custom,
  CustomPartitionKey: '@Metadata(TableName)', 
  MessageKey: 'PrimaryKey', 
  Topics: 'waction.Bank,Bank;waction.Person,Person;waction.reatailData,retailData', 
  CustomMessageKey: '' ) 
FORMAT USING Global.DSVFormatter  ( 
  quotecharacter: '\"', 
  handler: 'com.webaction.proc.DSVFormatter', 
  columndelimiter: ',', 
  formatterName: 'DSVFormatter', 
  nullvalue: 'NULL', 
  usequotes: 'false', 
  rowdelimiter: '\n', 
  standard: 'none', 
  header: 'false' ) 
INPUT FROM Modified_DB_Data;

END APPLICATION DBToKafka;

Single topic mapping

CREATE OR REPLACE APPLICATION FileToKafka;

CREATE OR REPLACE SOURCE FileSource USING Global.FileReader ( 
  adapterName: 'FileReader', 
  rolloverstyle: 'Default', 
  directory: '/Users/hareeswaran/Documents/CSVData', 
  blocksize: 64, 
  wildcard: 'bank.csv', 
  skipbom: true, 
  includesubdirectories: false, 
  positionbyeof: false ) 
PARSE USING Global.DSVParser ( 
  trimwhitespace: false, 
  linenumber: '-1', 
  columndelimiter: ',', 
  columndelimittill: '-1', 
  trimquote: true, 
  ignoreemptycolumn: false, 
  separator: ':', 
  parserName: 'DSVParser', 
  quoteset: '\"', 
  handler: 'com.webaction.proc.DSVParser_1_0', 
  charset: 'UTF-8', 
  ignoremultiplerecordbegin: 'true', 
  ignorerowdelimiterinquote: false, 
  blockascompleterecord: false, 
  rowdelimiter: '\n', 
  nocolumndelimiter: false, 
  headerlineno: 0, 
  header: true ) 
OUTPUT TO FileData;

CREATE OR REPLACE TARGET KTWSingleTopic USING Global.KafkaWriter ( 
  CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}', 
  MessageKey: 'None',  
  AutoCreateTopic: true, 
  CDDLAction: 'Process', 
  E1P: true, 
  ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m', 
  Topics: 'Employee', 
  TopicKey: '', 
  ConnectionProfileName: 'admin.KafkaCp’, 
  Serializer: 'StriimSerializer', 
  CommitPolicy: 'EventCount=10000;Interval=15s', 
  CustomMessageKey: '' ) 
FORMAT USING Global.DSVFormatter  ( 
  quotecharacter: '\"', 
  handler: 'com.webaction.proc.DSVFormatter', 
  columndelimiter: ',', 
  formatterName: 'DSVFormatter', 
  nullvalue: 'NULL', 
  usequotes: 'false', 
  rowdelimiter: '\n', 
  standard: 'none', 
  header: 'false' ) 
INPUT FROM FileData;

END APPLICATION FileToKafka;

Explicit topic mapping

When only a single source topic has to be mapped to target topic

CREATE OR REPLACE APPLICATION DBToKafka;

CREATE OR REPLACE SOURCE DB_Source USING Global.DatabaseReader ( 
  Tables: 'waction.Bank;waction.Person;waction.retailData;', 
  DatabaseProviderType: 'Default', 
  FetchSize: 100, 
  adapterName: 'DatabaseReader', 
  QuiesceOnILCompletion: false, 
  ConnectionURL: 'jdbc:mysql://localhost:3306/waction', 
  Password_encrypted: 'true', 
  Password: 'B1DOvnx4vVGm7fflSQWDxw==', 
  Username: 'root' ) 
OUTPUT TO DB_Data;

CREATE OR REPLACE TARGET KTWExplicitTopic USING Global.KafkaWriter ( 
  CheckpointTopicConfig: '{"PartitionCount":1,"ReplicationFactor":3,"CleanUp Policy":"compact","Retention Time":"2592000000","Retention Size":"1048576"}', 
  adapterName: 'KafkaMultiTopicWriter',  
  MessageKey: 'Custom', 
  TopicKey: '@MetaData(TableName)', 
  AutoCreateTopic: true, 
  CDDLAction: 'Process', 
  DataTopicConfig: '{"PartitionCount":"1","ReplicationFactor":"3"}', 
  E1P: true, 
  ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m', 
  CustomMessageKey: 'OperationName=@MetaData(OperationName)', 
  ConnectionProfileName: 'admin.KafkaCp, 
  Serializer: 'StriimSerializer', 
  CommitPolicy: 'EventCount=10000;Interval=15s', 
  PartitionKey: 'None', 
  MessageHeader: 'CompanyName="Striim"', 
  Topics: 'waction.Bank,Bank;waction.Person,Person;waction.reatailData,retailData' ) 
FORMAT USING Global.AvroFormatter  ( 
  SchemaRegistryConnectionProfileName: 'admin.confluentSchemaRegistryCP', 
  useSchemaRegistryConnectionProfile: 'true', 
  formatAs: 'Native', 
  SchemaRegistrySubjectName: 'UseTopicName' ) 

INPUT FROM DB_Data;

END APPLICATION DBToKafka;