Skip to main content

Using the Confluent or Hortonworks schema registry

Note

This feature requires Kafka 0.10 or later (0.11 or later recommended), except when the schema registry is in Confluent Cloud it requires 0.11 or later.

You may use the Confluent or Hortonworks schema registry by selecting AvroFormatter and specifying its schemaRegistryURL. property, for example, schemaRegistryURL:'http://198.51.100.55:8081.

Tracking schema evolution of database tables

When a KafkaWriter target's input stream is the output stream of a DatabaseReader or CDC reader source, this allows you to track the evolution of database tables over time. The first time the Striim application is run, KafkaWriter creates a record in the schema registry for each table being read. Except when using Confluent's wire format each schema record has a unique ID which which is stored in the next four bytes (bytes 4-7) after the record length (bytes 0-3) of the data records for the associated table.

Each time KafkaWriter receives a DDL event, it writes a new schema record for the referenced table. From then until the next schema change, that schema record's unique ID is stored in data records for the associated table.

In the source, if it has the CDDL Capture property, it must be set to True and the CDDL Acton must be Process.

In this case, in addition to specifying AvroFormatter's schemaRegistryURL property, you must set its formatAs property to table or native.

When the  formatAs property is table or native and any of the special characters listed in Using non-default case and special characters in table identifiers are used in source column names, they will be included as aliases in the Avro schema fields.Using non-default case and special characters in table identifiers

Tracking evolution of Striim stream types

When a KafkaWriter target's input stream is of a user-defined type, the schema registry allows you to track the evolution of that type over time. The first time the Striim application is run, KafkaWriter creates a record in the schema registry for the input stream's type. Each time the application the KafkaWriter's input stream's type is changed using ALTER and RECOMPILE, KafkaWriter will create a new schema record. As with database tables, the schema records' unique IDs associate them with their corresponding data records.

In this case, specify AvroFormatter's schemaRegistryURL property and leave formatAs unspecified or set to default.

Reading schema and data records together

Consuming applications can use the following code to combine schema and data records into Avro records

package test.kafka.avro;
 
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import org.apache.kafka.common.TopicPartition;
 
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Properties;
import java.util.List;
 
public class KafkaAvroConsumerUtilWithDeserializer {
 
    private KafkaConsumer<byte[], Object> consumer;
     
    public KafkaAvroConsumerUtilWithDeserializer(String configFileName) throws Exception {
         
        Properties props = new Properties();
        InputStream in = new FileInputStream(configFileName);
        props.load(in);
 
        this.consumer = new KafkaConsumer<byte[], Object>(props);
     
        TopicPartition tp = new TopicPartition(props.getProperty("topic.name"), 0);
        List<TopicPartition> tpList = new ArrayList<TopicPartition>();
        tpList.add(tp);
        this.consumer.assign(tpList);
        this.consumer.seekToBeginning(tpList);
    }
     
    public void consume() throws Exception {
        while(true) {
            ConsumerRecords<byte[], Object> records = consumer.poll(1000);
            for(ConsumerRecord<byte[], Object> record : records) {
                System.out.println("Topic " + record.topic() + " partition " + record.partition()
                    + " offset " + record.offset() + " timestamp " + record.timestamp());
                List<GenericRecord> avroRecordList = (List<GenericRecord>) record.value();
                for(GenericRecord avroRecord : avroRecordList) {
                    System.out.println(avroRecord);
                }
            }
        }
    }  
 
    public void close() throws Exception {
        if(this.consumer != null) {
            this.consumer.close();
            this.consumer = null;
        }
    }
     
    public static void help() {
        System.out.println("Usage :\n x.sh {path_to_config_file}");
    }
     
    public static void main(String[] args) throws Exception {
        if(args.length != 1) {
            help();
            System.exit(-1);
        }
        String configFileName = args[0];
        System.out.println("KafkaConsumer config file : " + configFileName);
        KafkaAvroConsumerUtilWithDeserializer consumerutil = null;
        try {
            consumerutil = new KafkaAvroConsumerUtilWithDeserializer(configFileName);
            consumerutil.consume();
        } finally {
            if(consumerutil != null) {
                consumerutil.close();
                consumerutil = null;
            }
        }
    }
}

The pom.xml for that class must include the following dependencies. Adjust the Kafka version to match your environment.

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.7.7</version>
    </dependency>
</dependencies>

StriimKafkaAvroDeserializer.jar must be in the classpath when you start your application. You can download StriimKafkaAvroDeserializer.jarrom https://github.com/striim/doc-downloads.

The config file for your Kafka consumer should be similar to:

bootstrap.servers=192.168.1.35:9092
topic.name=test_sync_registry
schemaregistry.url=http://192.168.1.35:8081/
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=com.striim.kafka.deserializer.KafkaAvroDeserializer
group.id=KafkaAvroDemoConsumer

The command to start the consumer with the deserializer should be something like:

java -Xmx1024m -Xms256m -Djava.library.path=/usr/local/lib:/usr/bin/java \
  -cp "target/classes:target/dependency/*:<path to \
  StriimKafkaAvroDeserializer.jar>" com.striim.kafka.KafkaAvroConsumerUtil $*

Schema registry REST API calls

The following Kafka schema registry REST API calls may be useful in using the schema registry.

To list all the subjects in the schema registry, use curl -X GET http://localhost:8081/subjects. This will return a list of subjects:

["AVRODEMO.EMP","AVRODEMO.DEPT","DDLRecord"]

This shows that there are schemas for the EMP and DEPT tables. DDLRecord defines the schemas for storing DDL events.

To list the versions for a particular schema, use curl -X GET http://localhost:8081/subjects/<subject>/versions. If there were three versions of the EMP table's schema, curl -X GET http://localhost:8081/subjects/AVRODEMO.EMP/versions would return:

[1,2,3]

To see the second version of that schema, you would use curl -X GET http://localhost:8081/subjects/AVRODEMO.EMP/versions/2:

{
    "subject": "AVRODEMO.EMP",
    "version": 2,
    "id": 261,
    "schema": {
        "type": "record",
        "name": "EMP",
        "namespace": "AVRODEMO",
        "fields": [{ ...

The first three lines are Kafka metadata. The subject property, which identifies the table, is the same for all schema records associated with that table. The  version property records the order in which the schema versions were created. id is the schema's unique identifier. The rest of the output is the schema definition in one of the formats shown below.

Record formats for database schema evolution

For example, say you have the following Oracle table:

CREATE TABLE EMP(  
  EMPNO    NUMBER(4,0),  
  ENAME    VARCHAR2(10),  
  JOB      VARCHAR2(9),  
  HIREDATE TIMESTAMP,  
  SAL      BINARY_DOUBLE,  
  COMM     BINARY_FLOAT,  
  DEPTNO   INT ...

Let's start by looking at the default format of an Avro-formatted Kafka record for an input type of WAEvent:

{
    "type": "record",
    "name": "WAEvent",
    "namespace": "WAEvent.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "before",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

For more information, see WAEvent contents for change data and Parsing the fields of WAEvent for CDC readers.

AvroFormatter's formatAs property allows two other formats, table and native. When you use one of these, the first time an event is received from the table, a record is written in the schema registry in the following format:

formatAs: 'table'

formatAs: 'native'

{
    "type": "record",
    "name": "EMP",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "EMPNO",
        "type": ["null", "string"]
    }, {
        "name": "ENAME",
        "type": ["null", "string"]
    }, {
        "name": "JOB",
        "type": ["null", "string"]
    }, {
        "name": "HIREDATE",
        "type": ["null", "string"]
    }, {
        "name": "SAL",
        "type": ["null", "double"]
    }, {
        "name": "COMM",
        "type": ["null", "float"]
    }, {
        "name": "DEPTNO",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}
{
    "type": "record",
    "name": "EMP",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "data",
        "type": ["null", {
            "type": "record",
            "name": "data_record",
            "namespace": "data_record",
            "fields": [{
                "name": "EMPNO",
                "type": ["null", "string"]
            }, {
                "name": "ENAME",
                "type": ["null", "string"]
            }, {
                "name": "JOB",
                "type": ["null", "string"]
            }, {
                "name": "HIREDATE",
                "type": ["null", "string"]
            }, {
                "name": "SAL",
                "type": ["null", "double"]
            }, {
                "name": "COMM",
                "type": ["null", "float"]
            }, {
                "name": "DEPTNO",
                "type": ["null", "string"]
            }]
        }]
    }, {
        "name": "before",
        "type": ["null", {
            "type": "record",
            "name": "before_record",
            "namespace": "before_record",
            "fields": [{
                "name": "EMPNO",
                "type": ["null", "string"]
            }, {
                "name": "ENAME",
                "type": ["null", "string"]
            }, {
                "name": "JOB",
                "type": ["null", "string"]
            }, {
                "name": "HIREDATE",
                "type": ["null", "string"]
            }, {
                "name": "SAL",
                "type": ["null", "double"]
            }, {
                "name": "COMM",
                "type": ["null", "float"]
            }, {
                "name": "DEPTNO",
                "type": ["null", "string"]
            }]
        }]
    }, {
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "datapresenceinfo",
        "type": ["null", {
            "type": "record",
            "name": "datapresenceinfo_record",
            "namespace": "datapresenceinfo_record",
            "fields": [{
                "name": "EMPNO",
                "type": "boolean"
            }, {
                "name": "ENAME",
                "type": "boolean"
            }, {
                "name": "JOB",
                "type": "boolean"
            }, {
                "name": "HIREDATE",
                "type": "boolean"
            }, {
                "name": "SAL",
                "type": "boolean"
            }, {
                "name": "COMM",
                "type": "boolean"
            }, {
                "name": "DEPTNO",
                "type": "boolean"
            }]
        }]
    }, {
        "name": "beforepresenceinfo",
        "type": ["null", {
            "type": "record",
            "name": "beforepresenceinfo_record",
            "namespace": "beforepresenceinfo_record",
            "fields": [{
                "name": "EMPNO",
                "type": "boolean"
            }, {
                "name": "ENAME",
                "type": "boolean"
            }, {
                "name": "JOB",
                "type": "boolean"
            }, {
                "name": "HIREDATE",
                "type": "boolean"
            }, {
                "name": "SAL",
                "type": "boolean"
            }, {
                "name": "COMM",
                "type": "boolean"
            }, {
                "name": "DEPTNO",
                "type": "boolean"
            }]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

The native format includes more WAEvent data than the table format.

Records are also created for ALTER TABLE and CREATE TABLE DDL operations. The schema for DDL events is:

{
    "type": "record",
    "name": "DDLRecord",
    "namespace": "DDLRecord.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

Records for DDL events have this format:

{
    "metadata": {
        "CURRENTSCN": "386346466",
        "OperationName": "ALTER",
        "OperationSubName": "ALTER_TABLE_ADD_COLUMN",
        "TimeStamp": "2018-04-17T04:31:48.000-07:00",
        "ObjectName": "EMP",
        "COMMITSCN": "386346473",
        "TxnID": "9.10.60399",
        "COMMIT_TIMESTAMP": "2018-04-17T04:31:48.000-07:00",
        "CatalogName": null,
        "CatalogObjectType": "TABLE",
        "OperationType": "DDL",
        "SchemaName": "AVRODEMO",
        "STARTSCN": "386346462",
        "SCN": "386346466"
    },
    "data": {
        "DDLCommand": "ALTER TABLE AVRODEMO.EMP\n  ADD phoneNo NUMBER(10,0)"
    },
    "userdata": null,
    "__striimmetadata": {
        "position": "TQAAQA="
    }
}

This example is for an ALTER TABLE EMP ADD phoneNo NUMBER(10,0); command.

Records are also created for BEGIN, COMMIT, and ROLLBACK control events. The schema for control events is:

{
    "type": "record",
    "name": "DDLRecord",
    "namespace": "DDLRecord.avro",
    "fields": [{
        "name": "metadata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "data",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "userdata",
        "type": ["null", {
            "type": "map",
            "values": ["null", "string"]
        }]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

Records for control events have this format:

{
	"metadata": {
		"RbaSqn": "2031",
		"AuditSessionId": "439843",
		"MockedBegin": null,
		"CURRENTSCN": "8328113",
		"OperationName": "COMMIT",
		"SQLRedoLength": "6",
		"BytesProcessed": "424",
		"ParentTxnID": "5.17.9129",
		"SessionInfo": "UNKNOWN",
		"RecordSetID": " 0x0007ef.000001e3.00e0 ",
		"TimeStamp": "2018-06-16T17:41:57.000-07:00",
		"TxnUserID": "QATEST",
		"RbaBlk": "483",
		"COMMITSCN": "8328113",
		"TxnID": "5.17.9129",
		"Serial": "2943",
		"ThreadID": "1",
		"SEQUENCE": "1",
		"COMMIT_TIMESTAMP": "2018-06-16T17:41:57.000-07:00",
		"TransactionName": "",
		"STARTSCN": "8328112",
		"SCN": "832811300005716756777309964480000",
		"Session": "135"
	},
	"userdata": null
}

Record format for Striim stream type evolution

CREATE TYPE Person_TYpe (
  ID int KEY, 
  CITY string,
  CODE string,
  NAME string);

The schema record for the Striim type above would be:

{
    "type": "record",
    "name": "Person_Type",
    "namespace": "AVRODEMO",
    "fields": [{
        "name": "ID",
        "type": ["null", "int"]
    }, {
        "name": "CITY",
        "type": ["null", "string"]
    }, {
        "name": "CODE",
        "type": ["null", "string"]
    }, {
        "name": "NAME",
        "type": ["null", "string"]
    }, {
        "name": "__striimmetadata",
        "type": {
            "type": "record",
            "name": "StriimMeta_Record",
            "fields": [{
                "name": "position",
                "type": ["null", "string"]
            }]
        }
    }]
}

This shows the input stream's type name (Person_Type), which will be the same in all schema registry records for this KafkaWriter target, and  the names, default values, and types of its four fields at the time this record was created.

Sample record in sync mode:

{
  "ID": "1",
  "Code": "IYuqAbAQ07NS3lZO74VGPldfAUAGKwzR2k3",
  "City": "Chennai",
  "Name": "Ramesh",
  "__striimmetadata": {
    "position": "iUAB6JoOLYlCsaKn8nWYw"
  }
}

Schema registry sample application: OracleReader to KafkaWriter

The following example assumes you have the following tables in Oracle:

create table EMP(
  empno number(4,0),
  ename varchar2(10),
  job varchar2(9),
  hiredate Timestamp,
  sal binary_double,
  comm binary_float,
  deptno int,
   constraint pk_emp primary key (empno),
   constraint fk_deptno foreign key (deptno) references dept (deptno)
);
create table DEPT(
   deptno int,
  dname varchar2(30),
  loc varchar2(13),
 constraint pk_dept primary key (deptno)
);

This application will read from those tables and write to Kafka in native format:

CREATE APPLICATION Oracle2Kafka;

CREATE OR REPLACE SOURCE OracleSource USING OracleReader  (
  DictionaryMode: 'OfflineCatalog',
  FetchSize: 1,
  Username: 'sample',
  Password: 'sample',
  ConnectionURL: '10.1.10.11:1521:orcl',
  Tables: 'sample.emp;sample.dept'
 )
OUTPUT TO OracleStream ;
 
CREATE OR REPLACE TARGET WriteToKafka USING KafkaWriter VERSION '0.11.0' (
  Mode: 'Sync',
  Topic: 'test',
  brokerAddress: 'localhost:9093',
  KafkaConfig: 'request.timeout.ms=60001;session.timeout.ms=60000'
 )
FORMAT USING AvroFormatter  (
  schemaregistryurl: 'http://localhost:8081/',
  formatAs: 'Native'
 )
INPUT FROM OracleStream;

END APPLICATION Oracle2Kafka;

See Avro Parser for a sample application that reads from that Kafka topic.