Skip to main content

Creating a custom Kafka partitioner

The following simple example sends KafkaWriter output to partition 0 or 1 based on the PARTITION BY field of the target's input stream or the PartitionKey property value. This example assumes you are using Eclipse, but it may be adapted to any Java development environment.

  1. Create a new Java project.

  2. After naming the project and making any other necessary changes, click Next > Libraries > Add External JARs, navigate to Striim/lib, and double-click Common-4.2.0.jar.

  3. Finish creating the new Java project.

  4. Add a new class with package name com.custompartitioner and class name KafkaCityPartitioner.

  5. Replace the default contents of the new class with the following:

    package com.custompartitioner;
    import com.webaction.kafka.PartitionerIntf;
    import java.util.ArrayList;
    
    public class KafkaCityPartitioner implements PartitionerIntf {
    
    	@Override
    	public void close() {
    		// TODO Auto-generated method stub
    		
    	}
    
    	@Override
    	public int partition(String topic, Object keylist, Object event, int noOfPartitions) {
    		if(noOfPartitions < 2) {
    			throw new RuntimeException("Number of partitions is less than 2");
    		}
    		if(keylist != null) {
    			ArrayList<String> partitionKeyList = (ArrayList<String>) keylist;
    			String partitionKey = partitionKeyList.get(0);
    			if(partitionKey.equalsIgnoreCase("Amsterdam")) {
    				return 0;
    			}
    		}
    		return 1; 
    	}
    
    }
    

    If the partition key field value is Amsterdam, the event will be written to Kafka partition 0, otherwise it will be written to partition 1. This logic may be extended to handle additional values and partitions by adding else clauses. To guarantee that there will be no duplicate events after recovery, for each partition key the partitioning logic must always write to the same partition number.

  6. If Eclipse's Build Automatically option is disabled, build the project.

  7. Click File > Export > Java > JAR File > Next, select the class, and click Finish.

  8. Save the JAR file in Striim/lib.

  9. Restart Striim.

  10. To test the custom partitioner, create an application using this TQL. If you are not using Striim's internal Kafka instance, change the broker address to that of your Kafka instance.

    CREATE SOURCE CSVSource USING FileReader  ( 
      positionbyeof: false,
      directory: 'Samples',
      wildcard: 'city.csv'
    ) 
    PARSE USING DSVParser () 
    OUTPUT TO CSVStream;
    
    CREATE CQ CQ1
    INSERT INTO CQStream
    SELECT data[0] as data0 java.lang.Object,
      data[1] as data1 java.lang.Object,
      data[2] as data2 java.lang.Object,
      data[3] as data3 java.lang.Object
    FROM CSVStream;
    
    
    CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.9.0' ( 
      Topic: 'test01',
      brokerAddress: 'localhost:9092',
      KafkaConfig: 'partitioner.class=com.kafka.custompartitioner.KafkaCityPartitioner'
    ) 
    FORMAT USING DSVFormatter()
    INPUT FROM CQStream;

    Save the following as Striim/Samples/city.csv:

    Amsterdam,0,0,0
    Los Angeles,1,1,1
    Amsterdam,2,2,0
    Los Angeles,3,3,1
    Amsterdam,4,4,0
    Los Angeles,5,5,1
    Amsterdam,6,6,0
    Los Angeles,7,7,1
    Amsterdam,8,8,0
    Los Angeles,9,9,1

    Deploy and run the application.