Skip to main content

Reading from and writing to Kafka using Avro

This section provides examples of using KafkaWriter with AvroFormatter and KafkaReader with AvroParser and the various configurations which are supported. We will discuss about the wire format (with or without schema registry) of the Kafka Message when the data is produced using Striim’s KafkaWriter with AvroFormatter and how the external Kafka based consumers or any other applications can deserialise the Striim’s wire format.

And then on the reader side, we will see how it can read Avro records in different wire formats. KafkaReader will just need in change in a couple of configurations if the KafkaMessage has Avro records written by some other application than Striim.

Note

Note : In this document we have used Confluent’s schema registry for schema evolution and samples showing the configurations for the same. But other schema registry can also be used.

KafkaWriter + AvroFormatter with schema evolution
2180940087.png

The Kafka Messages produced by KafkaWriter can contain 1..n Avro records if the mode is Sync and 1 Avro Record if the mode was Async. In either of the mode the wire format of the Kafka Message would be 4 bytes of length of the payload and then the payload, in which the first four bytes would be schema registry id and then the Avro record bytes.

Sample Striim application loading data from Oracle to Kafka in Avro format

The schema evolution of the tables read from a OLTP source will be effective only if Avro formatter is configured with “FormatAs : Table or Native”. These modes are discussed in detail in Using the Confluent or Hortonworks schema registry. The wire format of the Kafka Message will be the same when KafkaWriter is configured with Sync or Async mode with AvroFormatter using “FormatAs:Native/Table”.

The following sample code uses OracleReader to capture the continuous changes happening on an Oracle Database and used AvroFormatter to convert the DMLs in Avro records (close to source table’s schema) and the respective tables’s schema is registered to schema registry and the id is added to every Avro record. These Avro records are written to the Kafka topic KafkaTest created in Confluent cloud Kafka. The user must create the topic before running the application. The confluent cloud authentication properties are specified in the KafkaConfig property in KafkaWriter (these are not required in case of apache kafka running locally) and the schema registry authentication credentials are specified as a part of the schema registry configuration in the AvroFormatter.

CREATE APPLICATION KafkaConfluentProducer RECOVERY 5 SECOND INTERVAL;

CREATE SOURCE OrcaleReader1 USING Global.OracleReader ( 
  TransactionBufferDiskLocation: '.striim/LargeBuffer', 
  Password: 'TxGrqYn+1TjUdQXwkEQ2UQ==', 
  DDLCaptureMode: 'All', 
  Compression: false, 
  ReaderType: 'LogMiner', 
  connectionRetryPolicy: 'timeOut=30, retryInterval=30, maxRetries=3', 
  FetchSize: 1, 
  Password_encrypted: 'true', 
  SupportPDB: false, 
  QuiesceMarkerTable: 'QUIESCEMARKER', 
  DictionaryMode: 'OnlineCatalog', 
  QueueSize: 2048, 
  CommittedTransactions: true, 
  TransactionBufferSpilloverSize: '1MB', 
  Tables: 'QATEST.POSAUTHORIZATION', 
  Username: 'qatest', 
  TransactionBufferType: 'Memory', 
  ConnectionURL: 'localhost:1521:xe', 
  FilterTransactionBoundaries: true, 
  SendBeforeImage: true ) 
OUTPUT TO outputStream12;

CREATE OR REPLACE TARGET KafkaWriter1 USING Global.KafkaWriter VERSION '2.1.0'( 
  KafkaConfigValueSeparator: '++', 
  Topic: 'kafkaTest', 
  KafkaConfig: 'max.request.size++10485760|batch.size++10000120|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  KafkaConfigPropertySeparator: '|', 
  adapterName: 'KafkaWriter', 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  Mode: 'ASync' ) 
FORMAT USING Global.AvroFormatter  ( 
  formatAs: 'Table', 
  handler: 'com.webaction.proc.AvroFormatter', 
  formatterName: 'AvroFormatter', 
  schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO') 
INPUT FROM outputStream12;

END APPLICATION  KafkaConfluentProducer;

Deploy and start the above application and refer to application monitoring metrics to check the progress.

Reading Avro Records in Striim wire format with schema registry

Now that the data is in Kafka topic it can be read by KafkaReader or any other external application. Striim’s KafkaReader doesn’t require the deserialiser to be specified in the KafkaConfig. Just setting the “SchemaRegistryURL” in AvroParser will do.

2180940105

Following is a sample java KafkaConsumer which takes input from a config file having configurations (same as provided in Striim application), like broker address, topic name, topic.name, schemaregistry.url, basic.auth.user.info, sasl.jaas.config, security.protocol , basic.auth.credentials.source, sasl.mechanism and ssl.endpoint.identification.algorithm.

“value.deserializer” will be com.striim.kafka.deserializer.KafkaAvroDeserializer.

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.List;

public class KafkaAvroConsumerUtilWithStriimDeserializer {

   private KafkaConsumer<byte[], Object> consumer;

   public KafkaAvroConsumerUtilWithStriimDeserializer(String configFileName) throws Exception {

       Properties props = new Properties();
       InputStream in = new FileInputStream(configFileName);
       props.load(in);

       this.consumer = new KafkaConsumer<byte[], Object>(props);

       TopicPartition tp = new TopicPartition(props.getProperty("topic.name"), 0);
       List<TopicPartition> tpList = new ArrayList<TopicPartition>();
       tpList.add(tp);
       this.consumer.assign(tpList);
       this.consumer.seekToBeginning(tpList);
   }

   public void consume() throws Exception {
       while(true) {
           ConsumerRecords<byte[], Object> records = consumer.poll(1000);
           for(ConsumerRecord<byte[], Object> record : records) {
               System.out.println("Topic " + record.topic() + " partition " + record.partition()
                       + " offset " + record.offset() + " timestamp " + record.timestamp());
               List<GenericRecord> avroRecordList = (List<GenericRecord>) record.value();
               for(GenericRecord avroRecord : avroRecordList) {
                   System.out.println(avroRecord);
               }
           }
       }
   }

   public void close() throws Exception {
       if(this.consumer != null) {
           this.consumer.close();
           this.consumer = null;
       }
   }

   public static void help() {
       System.out.println("Usage :\n x.sh {path_to_config_file}");
   }

   public static void main(String[] args) throws Exception {
       if(args.length != 1) {
           help();
           System.exit(-1);
       }
       String configFileName = args[0];
       System.out.println("KafkaConsumer config file : " + configFileName);
       KafkaAvroConsumerUtilWithStriimDeserializer consumerutil = null;
       try {
           consumerutil = new KafkaAvroConsumerUtilWithStriimDeserializer(configFileName);
           consumerutil.consume();
       } finally {
           if(consumerutil != null) {
               consumerutil.close();
               consumerutil = null;
           }
       }
   }

Include or edit the following properties in the KafkaConfig file:

bootstrap.servers=pkc-4yyd6.us-east1.gcp.confluent.cloud:9092
topic.name=kafkaTest
schemaregistry.url=https://psrc-4rw99.us-central1.gcp.confluent.cloud
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=com.striim.kafka.deserializer.KafkaAvroDeserializer
group.id=group1
basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";
security.protocol=SASL_SSL
basic.auth.credentials.source=USER_INFO
sasl.mechanism=PLAIN
ssl.endpoint.identification.algorithm=https

Command to run KafkaAvroConsumerUtilWithStriimDeserializer:

java -jar KafkaAvroConsumerUtilWithDeserializer.jar <path-to-config-file>

Sample Output of “KafkaAvroConsumerUtil” with Deserialiser (reading the content written by KafkaWriter (Async mode) and AvroFormatter (FormatAs : Table and SchemaRegistryURL) whose input stream was from a OracleReader).

Topic kafkaTest partition 0 offset 0 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 1", "MERCHANT_ID": "1", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 1 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 2", "MERCHANT_ID": "2", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 2 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 3", "MERCHANT_ID": "3", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 3 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 4", "MERCHANT_ID": "27", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 4 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 14", "MERCHANT_ID": "30", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 5 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 15", "MERCHANT_ID": "1", "CITY": "CBE"}

Reading via Kafka Reader with Avro Parser, schema registry URL specified

The sample application KafkaConfluentConsumer reads the data written to confluent Kafka cloud by Striim’s Kafka Writer. There won’t any change required to “value.deserializer” config in KafkaConfig (default is “com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer”). Just add required SASL authentication credentials to KafkaConfig. The schema registry authentication credentials are specified in the AvroParser.

CREATE APPLICATION KafkaConfluentConsumer;

CREATE OR REPLACE SOURCE KafkaAvroConsumer USING Global.KafkaReader VERSION '2.1.0' ( 
  AutoMapPartition: true, 
  KafkaConfig: 'value.deserializer++com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  KafkaConfigValueSeparator: '++', 
  Topic: 'kafkaTest', 
  KafkaConfigPropertySeparator: '|', 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  adapterName: 'KafkaReader', 
  startOffset: 0 ) 
PARSE USING Global.AvroParser ( 
  handler: 'com.webaction.proc.AvroParser_1_0', 
  schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO', 
  parserName: 'AvroParser' ) 
OUTPUT TO outputStream1;

CREATE OR REPLACE CQ CQ1 
INSERT INTO outputStream2 
SELECT AvroToJson(o.data),
           o.metadata
 FROM outputStream1 o;

CREATE OR REPLACE TARGET FileWriter1 USING Global.FileWriter ( 
  filename: 'confluentOutput1', 
  rolloveronddl: 'false', 
  flushpolicy: 'EventCount:10000,Interval:30s', 
  encryptionpolicy: '', 
  adapterName: 'FileWriter', 
  rolloverpolicy: 'EventCount:10000,Interval:30s' ) 
FORMAT USING Global.JSONFormatter  ( 
  handler: 'com.webaction.proc.JSONFormatter', 
  jsonMemberDelimiter: '\n', 
  EventsAsArrayOfJsonObjects: 'true', 
  formatterName: 'JSONFormatter', 
  jsonobjectdelimiter: '\n' ) 
INPUT FROM outputStream2;

END APPLICATION KafkaConfluentConsumer;

Sample Output of KafkaAvroConsumer (read the content written by KafkaWriter inAsync mode with AvroFormatter with FormatAs : Table and SchemaRegistryURL set):

Topic kafkaTest partition 0 offset 0 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 1", "MERCHANT_ID": "1", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 1 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 2", "MERCHANT_ID": "2", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 2 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 3", "MERCHANT_ID": "3", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 3 timestamp 1599572818966
{"BUSINESS_NAME": "COMPANY 4", "MERCHANT_ID": "27", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 4 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 14", "MERCHANT_ID": "30", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 5 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 15", "MERCHANT_ID": "1", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 6 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 16", "MERCHANT_ID": "2", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 7 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 17", "MERCHANT_ID": "3", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 8 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 18", "MERCHANT_ID": "27", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 9 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 19", "MERCHANT_ID": "28", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 10 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 20", "MERCHANT_ID": "29", "CITY": "CBE"}
Topic kafkaTest partition 0 offset 11 timestamp 1599572818967
{"BUSINESS_NAME": "COMPANY 21", "MERCHANT_ID": "30", "CITY": "CBE"}
KafkaWriter + AvroFormatter without schema evolution

The data from non OLTP sources whose schema might not evolve during the lifetime of the Striim’s application can use KafkaWriter in Sync or Async mode with AvroFormatter with just “SchemaFileName” property specified (the schema of the Avro records will be stored in this file). The same schema file has to be referred by consumers while its trying to deserialize the Avro records in the Kafka Message.

2180940084.png

The Kafka Messages produced by KafkaWriter can contain 1..n Avro records if the mode is Sync and 1 Avro Record if the mode was Async. In either of the mode the wire format of the Kafka Message would be 4 bytes of length of the payload and then the payload having only the Avro record bytes.

Sample Striim application loading data from file to Kafka in Avro format with schema file name specified

The following sample application writes the CSV records from file “posdata.csv” to a Kafka topic kafkaDSVTest in Avro Format using Kafka Writer with Avro Formatter (Schema file name configured). The respective schema will be written to a schema file.

CREATE APPLICATION KafkaWriterApplication;

CREATE SOURCE FileReader1 USING Global.FileReader ( 
  rolloverstyle: 'Default', 
  wildcard: 'posdata.csv', 
  blocksize: 64, 
  skipbom: true, 
  directory: '/Users/priyankasundararajan/Product/Samples/AppData', 
  includesubdirectories: false, 
  positionbyeof: false ) 
PARSE USING Global.DSVParser ( 
  trimwhitespace: false, 
  commentcharacter: '', 
  linenumber: '-1', 
  columndelimiter: ',', 
  trimquote: true, 
  columndelimittill: '-1', 
  ignoreemptycolumn: false, 
  separator: ':', 
  quoteset: '\"', 
  charset: 'UTF-8', 
  ignoremultiplerecordbegin: 'true', 
  ignorerowdelimiterinquote: false, 
  header: false, 
  blockascompleterecord: false, 
  rowdelimiter: '\n', 
  nocolumndelimiter: false, 
  headerlineno: 0 ) 
OUTPUT TO Stream1;

CREATE TARGET kafkawriter1 USING Global.KafkaWriter VERSION '2.1.0'( 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  KafkaConfigValueSeparator: '++', 
  MessageKey: '', 
  MessageHeader: '',  
  ParallelThreads: '', 
  Topic: 'kafkaDSVTest', 
  KafkaConfig: 'max.request.size++10485760|batch.size++10000120|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  KafkaConfigPropertySeparator: '|', 
  Mode: 'ASync' ) 
FORMAT USING Global.AvroFormatter  ( 
  schemaFileName: 'schema1', 
  formatAs: 'default' ) 
INPUT FROM Stream1;

END APPLICATION KafkaWriterApplication;

Deploy and start the above application and refer to application monitoring metrics to check the progress.

Reading Avro Records in Striim wire format with schema file

Now that the data is in Kafka topic it can be read by KafkaReader or any other external application. Striim’s KafkaReader doesn’t require the deserialiser to be specified in the KafkaConfig. Just setting the “SchemaFileName” in AvroParser will do.

2180940102

Following is a sample java KafkaConsumer which takes input from a config file (having configurations like broker address, topic name, value.deserializer (this is provided by Striim), topic.name, schemaFileName).

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.List;

public class KafkaAvroConsumerUtilWithSchemaFile {

   private KafkaConsumer<byte[], Object> consumer;

   public KafkaAvroConsumerUtilWithSchemaFile(String configFileName) throws Exception {


       Properties props = new Properties();
       InputStream in = new FileInputStream(configFileName);
       props.load(in);

       this.consumer = new KafkaConsumer<byte[], Object>(props);

       TopicPartition tp = new TopicPartition(props.getProperty("topic.name"), 0);
       List<TopicPartition> tpList = new ArrayList<TopicPartition>();
       tpList.add(tp);
       this.consumer.assign(tpList);
       this.consumer.seekToBeginning(tpList);
   }

   public void consume() throws Exception {
       while(true) {
           ConsumerRecords<byte[], Object> records = consumer.poll(1000);
           for(ConsumerRecord<byte[], Object> record : records) {
               System.out.println("Topic " + record.topic() + " partition " + record.partition()
                       + " offset " + record.offset() + " timestamp " + record.timestamp());
               List<GenericRecord> avroRecordList = (List<GenericRecord>) record.value();
               for(GenericRecord avroRecord : avroRecordList) {
                   System.out.println(avroRecord);
               }
           }
       }
   }

   public void close() throws Exception {
       if(this.consumer != null) {
           this.consumer.close();
           this.consumer = null;
       }
   }

   public static void help() {
       System.out.println("Usage :\n x.sh {path_to_config_file}");
   }

   public static void main(String[] args) throws Exception {
       if(args.length != 1) {
           help();
           System.exit(-1);
       }
       String configFileName = args[0];
       System.out.println("KafkaConsumer config file : " + configFileName);
       KafkaAvroConsumerUtilWithSchemaFile consumerutil = null;
       try {
           consumerutil = new KafkaAvroConsumerUtilWithSchemaFile(configFileName);
           consumerutil.consume();
       } finally {
           if(consumerutil != null) {
               consumerutil.close();
               consumerutil = null;
           }
       }
   }

Include the following properties in the KafkaConfig file:

bootstrap.servers=localhost:pkc-4yyd6.us-east1.gcp.confluent.cloud:9092
topic.name=kafkaDSVTest
schemaFileName=./schema1.avsc
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=com.striim.kafka.deserializer.StriimAvroLengthDelimitedDeserializer
group.id=KafkaAvroDemoConsumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.endpoint.identification.algorithm=https

Command to run KafkaAvroConsumerUtilWithSchemaFile:

java -jar KafkaAvroConsumerUtilWithSchemaFile.jar <path-to-config-file>

Sample output of KafkaAvroConsumerUtilWithSchemaFile:

Sample data read by KafkaAvroConsumerUtilWithSchemaFilean external consumer, from the Kafka topic “kafkaDSVTest”.

Topic kafkaTest1 partition 0 offset 0 timestamp 1599564126433
{"metadata": {"FileOffset": "0", "RecordEnd": "138", "RecordOffset": "0", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "BUSINESS NAME", "1": " MERCHANT ID", "2": " PRIMARY ACCOUNT NUMBER", "3": " POS DATA CODE", "4": " DATETIME", "5": " EXP DATE", "6": " CURRENCY CODE", "7": " AUTH AMOUNT", "8": " TERMINAL ID", "9": " ZIP", "10": " CITY"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 1 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "268", "RecordOffset": "138", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 1", "1": "D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "2": "6705362103919221351", "3": "0", "4": "20130312173210", "5": "0916", "6": "USD", "7": "2.20", "8": "5150279519809946", "9": "41363", "10": "Quicksand"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 2 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "399", "RecordOffset": "268", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 2", "1": "OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1", "2": "4710011837121304048", "3": "4", "4": "20130312173210", "5": "0815", "6": "USD", "7": "22.78", "8": "5985180438915120", "9": "16950", "10": "Westfield"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 3 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "530", "RecordOffset": "399", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 3", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "2553303262790204445", "3": "6", "4": "20130312173210", "5": "0316", "6": "USD", "7": "218.57", "8": "0663011190577329", "9": "18224", "10": "Freeland"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 4 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "663", "RecordOffset": "530", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 4", "1": "FZXC0wg0LvaJ6atJJx2a9vnfSFj4QhlOgbU", "2": "2345502971501633006", "3": "3", "4": "20130312173210", "5": "0813", "6": "USD", "7": "18.31", "8": "4959093407575064", "9": "55470", "10": "Minneapolis"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 5 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "796", "RecordOffset": "663", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 5", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "6388500771470313223", "3": "2", "4": "20130312173210", "5": "0415", "6": "USD", "7": "314.94", "8": "7116826188355220", "9": "39194", "10": "Yazoo City"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 6 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "925", "RecordOffset": "796", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 6", "1": "g6vtmaWp0CdIPEaeWfAeeu576BE7IuDk9H5", "2": "5202363682168656195", "3": "3", "4": "20130312173210", "5": "0215", "6": "USD", "7": "328.52", "8": "0497135571326680", "9": "85739", "10": "Tucson"}, "before": null, "userdata": null}
Topic kafkaLocalTest123 partition 0 offset 7 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "1056", "RecordOffset": "925", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 7", "1": "FYYQLlKwnmIor4nxrpKu0EnYXFC3aBy8oWl", "2": "8704922945605006285", "3": "0", "4": "20130312173210", "5": "0814", "6": "USD", "7": "261.11", "8": "1861218392021391", "9": "97423", "10": "Coquille"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 8 timestamp 1599564126445
{"metadata": {"FileOffset": "0", "RecordEnd": "1183", "RecordOffset": "1056", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 8", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "1241441620952009753", "3": "1", "4": "20130312173210", "5": "0816", "6": "USD", "7": "34.29", "8": "3594534131211228", "9": "40017", "10": "Defoe"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 9 timestamp 1599564126445
{"metadata": {"FileOffset": "0", "RecordEnd": "1313", "RecordOffset": "1183", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 9", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "2049824339216248859", "3": "2", "4": "20130312173210", "5": "0714", "6": "USD", "7": "31.51", "8": "6871027833256174", "9": "71334", "10": "Ferriday"}, "before": null, "userdata": null}

Kafka Reader + Avro Parser, schema file name specified

The sample application KafkaConfluentConsumer reads the data written to Confluent cloud Kafka by Striim’s Kafka Writer. There won’t any change required to “value.deserializer” config in KafkaConfig (default is “com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer”). Just add required SASL authentication credentials to KafkaConfig and specify the “SchemaFileName” property in AvroParser.

CREATE APPLICATION kafkaConfluentConsumer;

CREATE OR REPLACE SOURCE AvroKafkaConsumer USING Global.KafkaReader VERSION '2.1.0' ( 
  AutoMapPartition: true, 
  KafkaConfig: 'value.deserializer++com.striim.avro.deserializer.LengthDelimitedAvroRecordDeserializer|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  KafkaConfigValueSeparator: '++', 
  Topic: 'kafkaDSVTest', 
  KafkaConfigPropertySeparator: '|', 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  adapterName: 'KafkaReader', 
  startOffset: 0 ) 
PARSE USING Global.AvroParser ( 
  handler: 'com.webaction.proc.AvroParser_1_0', 
  schemaFileName: './Product/Samples/AppData/AvroSchema/schema1.avsc', 
  parserName: 'AvroParser' ) 
OUTPUT TO outputStream1;

CREATE OR REPLACE CQ CQ1 
INSERT INTO outputStream2 
SELECT AvroToJson(o.data),
           o.metadata
 FROM outputStream1 o;

CREATE OR REPLACE TARGET FileWriter1 USING Global.FileWriter ( 
  filename: 'confluentOutput1', 
  rolloveronddl: 'false', 
  flushpolicy: 'EventCount:10000,Interval:30s', 
  encryptionpolicy: '', 
  adapterName: 'FileWriter', 
  rolloverpolicy: 'EventCount:10000,Interval:30s' ) 
FORMAT USING Global.JSONFormatter  ( 
  handler: 'com.webaction.proc.JSONFormatter', 
  jsonMemberDelimiter: '\n', 
  EventsAsArrayOfJsonObjects: 'true', 
  formatterName: 'JSONFormatter', 
  jsonobjectdelimiter: '\n' ) 
INPUT FROM outputStream2;

END APPLICATION kafkaConfluentConsumer;

Sample output of Striim application, sample data read by an external consumer from the Kafka topic “kafkaDSVTest”.

Topic kafkaTest1 partition 0 offset 0 timestamp 1599564126433
{"metadata": {"FileOffset": "0", "RecordEnd": "138", "RecordOffset": "0", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "BUSINESS NAME", "1": " MERCHANT ID", "2": " PRIMARY ACCOUNT NUMBER", "3": " POS DATA CODE", "4": " DATETIME", "5": " EXP DATE", "6": " CURRENCY CODE", "7": " AUTH AMOUNT", "8": " TERMINAL ID", "9": " ZIP", "10": " CITY"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 1 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "268", "RecordOffset": "138", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 1", "1": "D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu", "2": "6705362103919221351", "3": "0", "4": "20130312173210", "5": "0916", "6": "USD", "7": "2.20", "8": "5150279519809946", "9": "41363", "10": "Quicksand"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 2 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "399", "RecordOffset": "268", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 2", "1": "OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1", "2": "4710011837121304048", "3": "4", "4": "20130312173210", "5": "0815", "6": "USD", "7": "22.78", "8": "5985180438915120", "9": "16950", "10": "Westfield"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 3 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "530", "RecordOffset": "399", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 3", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "2553303262790204445", "3": "6", "4": "20130312173210", "5": "0316", "6": "USD", "7": "218.57", "8": "0663011190577329", "9": "18224", "10": "Freeland"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 4 timestamp 1599564126441
{"metadata": {"FileOffset": "0", "RecordEnd": "663", "RecordOffset": "530", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 4", "1": "FZXC0wg0LvaJ6atJJx2a9vnfSFj4QhlOgbU", "2": "2345502971501633006", "3": "3", "4": "20130312173210", "5": "0813", "6": "USD", "7": "18.31", "8": "4959093407575064", "9": "55470", "10": "Minneapolis"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 5 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "796", "RecordOffset": "663", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 5", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "6388500771470313223", "3": "2", "4": "20130312173210", "5": "0415", "6": "USD", "7": "314.94", "8": "7116826188355220", "9": "39194", "10": "Yazoo City"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 6 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "925", "RecordOffset": "796", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 6", "1": "g6vtmaWp0CdIPEaeWfAeeu576BE7IuDk9H5", "2": "5202363682168656195", "3": "3", "4": "20130312173210", "5": "0215", "6": "USD", "7": "328.52", "8": "0497135571326680", "9": "85739", "10": "Tucson"}, "before": null, "userdata": null}
Topic kafkaLocalTest123 partition 0 offset 7 timestamp 1599564126442
{"metadata": {"FileOffset": "0", "RecordEnd": "1056", "RecordOffset": "925", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 7", "1": "FYYQLlKwnmIor4nxrpKu0EnYXFC3aBy8oWl", "2": "8704922945605006285", "3": "0", "4": "20130312173210", "5": "0814", "6": "USD", "7": "261.11", "8": "1861218392021391", "9": "97423", "10": "Coquille"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 8 timestamp 1599564126445
{"metadata": {"FileOffset": "0", "RecordEnd": "1183", "RecordOffset": "1056", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 8", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "1241441620952009753", "3": "1", "4": "20130312173210", "5": "0816", "6": "USD", "7": "34.29", "8": "3594534131211228", "9": "40017", "10": "Defoe"}, "before": null, "userdata": null}
Topic kafkaTest1 partition 0 offset 9 timestamp 1599564126445
{"metadata": {"FileOffset": "0", "RecordEnd": "1313", "RecordOffset": "1183", "FileName": "pos10.csv", "RecordStatus": "VALID_RECORD"}, "data": {"0": "COMPANY 9", "1": "ljh71ujKshzWNfXMdQyN8O7vaNHlmPCCnAx", "2": "2049824339216248859", "3": "2", "4": "20130312173210", "5": "0714", "6": "USD", "7": "31.51", "8": "6871027833256174", "9": "71334", "10": "Ferriday"}, "before": null, "userdata": null}
KafkaWriter + AvroFormatter with schema evolution - Confluent wire format with schema registry

Confluent Cloud is a resilient, scalable streaming data service based on Apache Kafka, delivered as a fully managed service. The only difference between local and cloud schema registry is the SASL setup required in the clients.

The confluent serializer writes data in Confluent wire format.

Sample Striim application loading data from Oracle to Kafka in Confluent wire format

The following sample code writes data from OracleDatabase to the kafkaTest topic in Confluent Cloud. Kafka Writer uses Confluent’s Avro serializer which takes care of registering the schema of the Avro record in the confluent cloud schema registry and adds the schema registry id with the respective Kafka messages. The avro records can be formatted in confluent wire format only in Async mode and Sync mode with batch disabled. The user must create the topic before running the application. The confluent cloud authentication properties are specified in the KafkaConfig property in KafkaWriter (these are not required in case of apache kafka running locally) and the schema registry authentication credentials are specified as a part of the schema registry configuration in the AvroFormatter.

Add the following configuration to Kafka Config in KafkaWriter

value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

Note: If Sync mode is enabled add the following configuration to Kafka Config

batch.size=-1

Set “schemaregistryurl and schemaregistryconfiguration” in Avro Formatter

schemaregistryurl: 'https://psrc-2225o.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO') 

and other required KafkaConfig for cloud based schema registry.

KafkaConfig: 'security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";'

The following application writes Kafka messages in Confluent’s Wire format with schema registry enabled.

create application confluentWireFormatTest;
Create Source oracleSource
Using OracleReader
(
Username: '<user-name>',
Password: '<password>',
ConnectionURL: '<connection-url>',
Tables:'<table-name>',
FetchSize:1,
Compression:true
)
Output To DataStream;
create Target t using KafkaWriter VERSION '0.11.0'(
brokerAddress:'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092',
mode :'ASync',
Topic:'kafkaTest',
KafkaConfig: 'value.serializer++io.confluent.kafka.serializers.KafkaAvroSerializer|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";', 
  KafkaConfigValueSeparator: '++', 
  KafkaConfigPropertySeparator: '|' 
)
format using AvroFormatter (
formatAS:'Table',
schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=:,basic.auth.credentials.source=USER_INFO'
)
input from DataStream;
end application confluentWireFormatTest;

Deploy and start the above application and refer to application monitoring metrics to check the progress.

Reading Avro Records in Confluent wire format with schema registry

Now that the data is in Kafka topic it can be read by KafkaReader or any other external application. Striim’s KafkaReader and external application requires the io.confluent.kafka.serializers.KafkaAvroDeserializer deserialiser to be specified in the KafkaConfig. Schema registry url and schema registry authentication credentials have to be specified.

2180940075.png

Following is a sample java KafkaConsumer which takes input from a config file (having configurations like broker address, topic name, value.deserializer (confluent deserializer), topic.name, schemaregistry.url, sasl.jaas.config, security.protocol, sasl.mechanism, ssl.endpoint.identification. algorithm, basic.auth.user.info ).

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.List;

public class KafkaAvroConsumerUtilConfluentWireFormat {

   private KafkaConsumer<byte[], Object> consumer;

   public KafkaAvroConsumerUtilConfluentWireFormat(String configFileName) throws Exception {


       Properties props = new Properties();
       InputStream in = new FileInputStream(configFileName);
       props.load(in);

       this.consumer = new KafkaConsumer<byte[], Object>(props);

       TopicPartition tp = new TopicPartition(props.getProperty("topic.name"), 0);
       List<TopicPartition> tpList = new ArrayList<TopicPartition>();
       tpList.add(tp);
       this.consumer.assign(tpList);
       this.consumer.seekToBeginning(tpList);
   }

   public void consume() throws Exception {
       while(true) {
           ConsumerRecords<byte[], Object> records = consumer.poll(1000);
           for(ConsumerRecord<byte[], Object> record : records) {
               System.out.println("Topic " + record.topic() + " partition " + record.partition()
                       + " offset " + record.offset() + " timestamp " + record.timestamp());
               List<GenericRecord> avroRecordList = (List<GenericRecord>) record.value();
               for(GenericRecord avroRecord : avroRecordList) {
                   System.out.println(avroRecord);
               }
           }
       }
   }

   public void close() throws Exception {
       if(this.consumer != null) {
           this.consumer.close();
           this.consumer = null;
       }
   }

   public static void main(String[] args) throws Exception {
       String configFileName = args[0];
       System.out.println("KafkaConsumer config file : " + configFileName);
       KafkaAvroConsumerUtilWithSchemaFile consumerutil = null;
       try {
           consumerutil = new KafkaAvroConsumerUtilConfluentWireFormat(configFileName);
           consumerutil.consume();
       } finally {
           if(consumerutil != null) {
               consumerutil.close();
               consumerutil = null;
           }
       }
   }

Include the following properties in the KafkaConfig file.

bootstrap.servers=pkc-4yyd6.us-east1.gcp.confluent.cloud:9092
topic.name=kafkaTest
schemaregistry.url=https://psrc-4rw99.us-central1.gcp.confluent.cloud
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
group.id=group1
basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";
security.protocol=SASL_SSL
basic.auth.credentials.source=USER_INFO
sasl.mechanism=PLAIN
ssl.endpoint.identification.algorithm=https

Command to run KafkaAvroConsumerUtilConfluentWireFormat:

java -jar KafkaAvroConsumerUtilConfluentWireFormat.jar <path-to-config-file>

Sample output of KafkaAvroConsumerUtilConfluentWireFormat, sample data read by KafkaAvroConsumerUtilConfluentWireFormat an external consumer, from the Kafka topic “kafkaTest”.

offset = 0, key = null, value = {"ID": 301, "STUDENT": "jack301", "AGE": 12} 
offset = 1, key = null, value = {"ID": 302, "STUDENT": "jack302", "AGE": 12} 
offset = 2, key = null, value = {"ID": 303, "STUDENT": "jack303", "AGE": 12} 
offset = 3, key = null, value = {"ID": 304, "STUDENT": "jack304", "AGE": 12} 
offset = 4, key = null, value = {"ID": 305, "STUDENT": "jack305", "AGE": 12} 

Reading via Kafka Reader with Avro Parser, schema registry specified

The sample application KafkaConfluentConsumer reads the data written to Confluent cloud Kafka by Striim’s Kafka Writer in confluent wire format. The “value.deserializer” config in KafkaConfig has to changed to “io.confluent.kafka.serializers.KafkaAvroDeserializer”. Add the required SASL authentication credentials to KafkaConfig and specify the schema registry url and schema registry authentication credentials in Avro Parser.

Add the following configuration to Kafka Config in KafkaReader:

value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

Set “schemaregistryurl and schemaregistryconfiguration” in Avro Parser:

schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO') 

for cloud based schema registry, also set KafkaConfig:

KafkaConfig: 'max.request.size==10485760:batch.size==10000120:sasl.mechanism==PLAIN:security.protocol==SASL_SSL:sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-cluster-api-key>\" password=\"<kafka-cluster-api-secret>\";'

The following application will be able to read Kafka Messages in Confluent’s Wire format with schema registry enabled.

CREATE APPLICATION KafkaConfluentConsumer;

CREATE OR REPLACE SOURCE ConfluentKafkaAvroConsumer USING Global.KafkaReader VERSION '2.1.0' ( 
  AutoMapPartition: true, 
  KafkaConfigValueSeparator: '++', 
  KafkaConfig: 'value.deserializer++io.confluent.kafka.serializers.KafkaAvroDeserializer|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  Topic: 'kafkaTest', 
  KafkaConfigPropertySeparator: '|', 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  adapterName: 'KafkaReader', 
  startOffset: 0 ) 
PARSE USING Global.AvroParser ( 
  handler: 'com.webaction.proc.AvroParser_1_0', 
  schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud', 
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO',
  parserName: 'AvroParser' ) 
OUTPUT TO outputStream1;

CREATE OR REPLACE CQ CQ1 
INSERT INTO outputStream2 
SELECT AvroToJson(o.data),
           o.metadata
 FROM outputStream1 o;

CREATE OR REPLACE TARGET FileWriter1 USING Global.FileWriter ( 
  filename: 'confluentOutput1', 
  rolloveronddl: 'false', 
  flushpolicy: 'EventCount:10000,Interval:30s', 
  encryptionpolicy: '', 
  adapterName: 'FileWriter', 
  rolloverpolicy: 'EventCount:10000,Interval:30s' ) 
FORMAT USING Global.JSONFormatter  ( 
  handler: 'com.webaction.proc.JSONFormatter', 
  jsonMemberDelimiter: '\n', 
  EventsAsArrayOfJsonObjects: 'true', 
  formatterName: 'JSONFormatter', 
  jsonobjectdelimiter: '\n' ) 
INPUT FROM outputStream2;

END APPLICATION KafkaConfluentConsumer;

Sample data read by Kafka Reader with Avro Parser from the kafka topic “kafkaTest”.

[
 {
  "ID":301,"STUDENT":"jack301","AGE":12
 },
 {
  "ID":302,"STUDENT":"jack302","AGE":12
 },
 {
  "ID":303,"STUDENT":"jack303","AGE":12
 }
 ]
KafkaReader + AvroParser reading Confluent wire format

This producer uses Confluent’s Avro serializer which takes care of registering the schema in the confluent cloud schema registry and adds the schema registry id with the respective Avro records in Kafka messages. The confluent cloud configuration properties (server url, sasl config, schema registry url, schema registry authentication credentials) are specified as a part of the KafkaProducer properties. It could be any other application which produces the data using “io.confluent.kafka.serializers.KafkaAvroSerializer.class” as value.deserilzier

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import java.io.File;
import java.io.IOException;
import java.util.Properties;

public class ConfluentKafkaProducer  {
    public static void main(String[] args) throws IOException {
        if(args.length != 1) {
           help();
           System.exit(-1);
        }
        String schemaFilename = args[0];
        Properties props = new Properties();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);       
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-4yyd6.us-east1.gcp.confluent.cloud:9092");
        props.put("schema.registry.url", "https://psrc-4rw99.us-central1.gcp.confluent.cloud");
        props.put("security.protocol","SASL_SSL");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule   required         username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";");
        props.put("ssl.endpoint.identification.algorithm","https");
        props.put("sasl.mechanism","PLAIN");
        props.put("basic.auth.credentials.source","USER_INFO");
        props.put("basic.auth.user.info","<SR_API_KEY>:<SR_API_SECRET>");
         
        Schema.Parser parser = new Schema.Parser();
        
        /* Schema used in this sample
         * {
         *   "namespace": "mytype.avro",
         *  "type" : "record",
         *   "name": "Array_Record",
         *  "fields": [
         *   {"name" : "ID", "type" : [ "null" , "int" ] },
         *   {"name" : "Name", "type" : [ "null" , "string" ] }
         *  ]
         * }
         */
        Schema schema = parser.parse(new File(schemaFilename));
        for (int i = 0; i < 10; i++) {
            KafkaProducer producer = new KafkaProducer(props);
            GenericRecord avroRecord;
            avroRecord = new GenericData.Record(schema);
            avroRecord.put("ID", i);
            avroRecord.put("Name", "xxx");
            ProducerRecord<String, Object> record = new ProducerRecord<String, Object>(“<topic>”, "null", avroRecord);
            try {
                producer.send(record);
            } catch (SerializationException e) {
                e.printStackTrace();
            }
            finally {
                producer.flush();
                producer.close();
            }
        }
    }
    
    public static void help() {
       System.out.println("Usage :\n x.sh {path_to_schema_file}");
    }
   
}

Command to run ConfluentKafkaProducer:

java -jar KafkaAvroConsumerUtilWithSchemaFile.jar <path-to-schemafile>

KafkaReader +_ AvroParser configuration to read Confluent wire format

Striim’s KafkaReader-AvroParser (from v3.10.3.4) has the ability to parse the Avro records written in Confluent’s wire format. Avro record in Confluent’s Wire format will always have schema registered to schema registry.

2180940099

Add the following configuration to Kafka Config in KafkaReader

value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

Set “schemaregistryurl and schemaregistryconfiguration” in Avro Parser

schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud',
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO') 

and other required KafkaConfig for cloud based schema registry.

KafkaConfig: 'max.request.size==10485760:batch.size==10000120:sasl.mechanism==PLAIN:security.protocol==SASL_SSL:sasl.jaas.config==org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<kafka-cluster-api-key>\" password=\"<kafka-cluster-api-secret>\";'

The following application will be able to read Kafka Messages in Confluent’s Wire format with schema registry enabled.

CREATE APPLICATION KafkaConfluentConsumer;

CREATE OR REPLACE SOURCE ConfluentKafkaAvroConsumer USING Global.KafkaReader VERSION '2.1.0' ( 
  AutoMapPartition: true, 
  KafkaConfigValueSeparator: '++', 
  KafkaConfig: 'value.deserializer++io.confluent.kafka.serializers.KafkaAvroDeserializer|security.protocol++SASL_SSL|ssl.endpoint.identification.algorithm++https|sasl.mechanism++PLAIN|sasl.jaas.config++org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";', 
  Topic: 'kafkaTest', 
  KafkaConfigPropertySeparator: '|', 
  brokerAddress: 'pkc-4yyd6.us-east1.gcp.confluent.cloud:9092', 
  adapterName: 'KafkaReader', 
  startOffset: 0 ) 
PARSE USING Global.AvroParser ( 
  handler: 'com.webaction.proc.AvroParser_1_0', 
  schemaregistryurl: 'https://psrc-4rw99.us-central1.gcp.confluent.cloud', 
schemaregistryconfiguration:'basic.auth.user.info=<SR_API_KEY>:<SR_API_PASSWORD>,basic.auth.credentials.source=USER_INFO',
  parserName: 'AvroParser' ) 
OUTPUT TO outputStream1;

CREATE OR REPLACE CQ CQ1 
INSERT INTO outputStream2 
SELECT AvroToJson(o.data),
           o.metadata
 FROM outputStream1 o;

CREATE OR REPLACE TARGET FileWriter1 USING Global.FileWriter ( 
  filename: 'confluentOutput1', 
  rolloveronddl: 'false', 
  flushpolicy: 'EventCount:10000,Interval:30s', 
  encryptionpolicy: '', 
  adapterName: 'FileWriter', 
  rolloverpolicy: 'EventCount:10000,Interval:30s' ) 
FORMAT USING Global.JSONFormatter  ( 
  handler: 'com.webaction.proc.JSONFormatter', 
  jsonMemberDelimiter: '\n', 
  EventsAsArrayOfJsonObjects: 'true', 
  formatterName: 'JSONFormatter', 
  jsonobjectdelimiter: '\n' ) 
INPUT FROM outputStream2;

END APPLICATION KafkaConfluentConsumer;

Sample data read by Kafka Reader with Avro Parser from the kafka topic “kafkaTest”:

offset = 0, key = null, value = {"ID": 0, "Name": "xxx"} 
offset = 1, key = null, value = {"ID": 1, "Name": "xxx"} 
offset = 2, key = null, value = {"ID": 2, "Name": "xxx"} 
offset = 3, key = null, value = {"ID": 3, "Name": "xxx"} 
offset = 4, key = null, value = {"ID": 4, "Name": "xxx"} 
offset = 5, key = null, value = {"ID": 5, "Name": "xxx"} 
offset = 6, key = null, value = {"ID": 6, "Name": "xxx"} 
offset = 7, key = null, value = {"ID": 7, "Name": "xxx"} 
offset = 8, key = null, value = {"ID": 8, "Name": "xxx"} 
offset = 9, key = null, value = {"ID": 9, "Name": "xxx"} 
KafkaReader + AvroParser reading Avro records with no delimiters

There can be producers which will write one Avro record in a Kafka Message and will have not have delimiters (like a magic byte or length delimiter). All bytes in a KafkaMessage read should be used to construct the Avro record. In this variation since there is no way to add schema registry ID, a schema file name is required for the Striim’s Avro Parser to parse the records.

The producer produces one Avro record per Kafka message that is not length delimited.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DirectBinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.Properties;

public class SingleAvroRecordKafkaProducer {

    public static void main(String[] args) {
        if(args.length != 1) {
           help();
           System.exit(-1);
        }
        String schemaFilename = args[0];
        /* Schema used in this sample
        * {
        *   "namespace": "mytype.avro",
        *  "type" : "record",
        *   "name": "Array_Record",
        *  "fields": [
        *   {"name" : "ID", "type" : [ "null" , "int" ] },
        *   {"name" : "Name", "type" : [ "null" , "string" ] }
        *  ]
        * }
        */
        Schema schema = new Schema.Parser().parse(schemaFilename);

        Properties props = new Properties();
        props.put("bootstrap.servers", "pkc-4yyd6.us-east1.gcp.confluent.cloud:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-4yyd6.us-east1.gcp.confluent.cloud:9092");
        props.put("security.protocol","SASL_SSL");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule   required         username=\"<CLUSTER_API_KEY>\"   password=\"<CLUSTER_API_SECRET>\";");
        props.put("ssl.endpoint.identification.algorithm","https");
        props.put("sasl.mechanism","PLAIN");

        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
        
        for (int nEvents = 0; nEvents < 10; nEvents++) {
            byte[] serializedBytes = null;
            GenericRecord avrorecord;
            try {
                avrorecord = new GenericData.Record(schema);
                avrorecord.put("ID", (int) nEvents);
                avrorecord.put("Name", "xxx");
                DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter(schema);//<GenericRecord>(schema);
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                DirectBinaryEncoder binaryEncoder = (DirectBinaryEncoder) EncoderFactory.get().directBinaryEncoder(out, null);
                datumWriter.write(avrorecord, binaryEncoder);
                binaryEncoder.flush();
                out.close();
                serializedBytes = out.toByteArray();
                ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(<topicName>, nEvents, null, serializedBytes);
                producer.send(record);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.flush();
                producer.close();
            }
        }
    }
    
    public static void help() {
       System.out.println("Usage :\n x.sh {path_to_schema_file}");
    }
}

Command to run SingleAvroRecordKafkaProducer:

java -jar SingleAvroRecordKafkaProducer.jar <path-to-schemafile>

Kafka Reader with Avro Parser - one Avro record in Kafka message

2180940096

Kafka Reader with an Avro parser can read Single Avro record in a Kafka Message, which is not length delimited. Add the following configuration “value.deserializer= com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer” to Kafka Config.

CREATE APPLICATION SingleAvroRecordKafkaConsumerTest;

CREATE OR REPLACE SOURCE SingleAvroRecordKafkaConsumer USING Global.KafkaReader VERSION '2.1.0' ( 
  AutoMapPartition: true, 
  brokerAddress: 'localhost:9092', 
  KafkaConfigValueSeparator: '++', 
  KafkaConfigPropertySeparator: '|', 
  KafkaConfig: 'value.deserializer++com.striim.avro.deserializer.SingleRecordAvroRecordDeserializer', 
  adapterName: 'KafkaReader', 
  Topic: 'avroTest1', 
  startOffset: 0 ) 
PARSE USING Global.AvroParser ( 
  schemaFileName: './Product/Samples/AppData/AvroSchema/arrayschema.avsc', 
  handler: 'com.webaction.proc.AvroParser_1_0', 
  parserName: 'AvroParser' ) 
OUTPUT TO outputStream1;

CREATE OR REPLACE CQ CQ1 INSERT INTO outputStream2 SELECT 
-- conversion from org.apache.avro.util.Utf8 to String is required here
 o.data, o.metadata From outputStream1 o;;

CREATE OR REPLACE TARGET FileWriter1 USING Global.FileWriter ( 
  filename: 'confluentOutput1', 
  rolloveronddl: 'false', 
  flushpolicy: 'EventCount:10000,Interval:30s', 
  encryptionpolicy: '', 
  adapterName: 'FileWriter', 
  rolloverpolicy: 'EventCount:10000,Interval:30s' ) 
FORMAT USING Global.JSONFormatter  ( 
  handler: 'com.webaction.proc.JSONFormatter', 
  jsonMemberDelimiter: '\n', 
  EventsAsArrayOfJsonObjects: 'true', 
  formatterName: 'JSONFormatter', 
  jsonobjectdelimiter: '\n' ) 
INPUT FROM outputStream2;

END APPLICATION SingleAvroRecordKafkaConsumerTest;

Sample Output of SingleAvroRecordKafkaConsumerTest:

"data":{"ID": 386, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":386,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 387, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":387,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 388, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":388,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 389, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":389,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 390, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":390,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 391, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":391,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 392, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":392,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 393, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":393,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 394, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":394,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 395, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":395,"PartitionID":0,"TopicName":"avroTest1"}
 },
 {
  "data":{"ID": 396, "Name": "xxx"},
  "metadata":{"KafkaRecordOffset":396,"PartitionID":0,"TopicName":"avroTest1"}
 }