Skip to main content

Reading a Kafka stream with an external Kafka consumer

To read a Kafka stream's persisted data in an external application:

  • Include dataformat:'avro' in the stream's property set.

  • Generate an Avro schema file for the stream using the console command EXPORT <namespace>.<stream name>. This will create a schema file <namespace>_<stream name>_schema.avsc in the Striim program directory. Optionally, you may specify a path or file name using EXPORT <namespace>.<stream name> '<path>' '<file name>'. The .avsc extension will be added automatically.

  • Copy the schema file to a location accessible by the external application.

  • In the external application, use the Zookeeper and broker addresses in the stream's property set, and reference the stream using <namespace>_<stream name>.

Note

Recovery (see Recovering applications) is not supported for Avro-formatted Kafka streams.

For example, the following program would read from Samples.PosDataStream:

import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

public class SimpleKafkaConsumer
{
    /**
 * This method issues exactly one fetch request to a kafka topic/partition and prints out
 * all the data from the response of the fetch request.
 * @param topic_name Topic to fetch messages from
 * @param schema_filename Avro schema file used for deserializing the messages
 * @param host_name Host where the kafka broker is running
 * @param port Port on which the kafka broker is listening
 * @param clientId Unique id of a client doing the fetch request
 * @throws Exception
 */
 public void read(String topic_name, String schema_filename, String host_name, int port, String clientId) throws Exception
    {
        SimpleConsumer simpleConsumer = new SimpleConsumer(host_name, port, 100000, 64 * 1024, clientId);
        // This is just an example to read from partition 1 of a topic.
 int partitionId = 1;

        // Finds the first offset in the logs and starts fetching messages from that offset.
 long offset = getOffset(simpleConsumer, topic_name, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId);

        // Builds a fetch request.
 FetchRequestBuilder builder = new FetchRequestBuilder();
        builder.clientId(clientId);
        builder.addFetch(topic_name, partitionId, offset, 43264200);
        FetchRequest fetchRequest = builder.build();

        // Get the response of the fetch request.
 FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);

        // Instantiates an avro deserializer based on the schema file.
 SpecificDatumReader datumReader = getAvroReader(schema_filename);

        if (fetchResponse.hasError())
        {
            System.out.println("Error processing fetch request: Reason -> "+fetchResponse.errorCode(topic_name, 1));
        }
        else
  {
            ByteBufferMessageSet bbms = fetchResponse.messageSet(topic_name, partitionId);
            int count = 0;
            for (MessageAndOffset messageAndOffset : bbms)
            {
                ByteBuffer payload = messageAndOffset.message().payload();
                {
                    // The message format is Striim specific and it looks like :
 // 1. First 4 bytes represent an integer(k) which tells the size of the actual message in the byte buffer.
 // 2. Next 'k' bytes stores the actual message.
 // 3. This logic runs in a while loop until the byte buffer limit.
 while (payload.hasRemaining())
                    {
                        int size = payload.getInt();

                        // if the size is invalid, or if there aren't enough bytes to process this chunk, then just bail.
 if ((payload.position() + size > payload.limit()) || size == 0)
                        {
                            break;
                        }
                        else
 {
                            byte[] current_bytes = new byte[size];
                            payload.get(current_bytes, 0, size);
                            BinaryDecoder recordDecoder = DecoderFactory.get().binaryDecoder(current_bytes, null);
                            GenericData.Record record = (GenericData.Record) datumReader.read(null, recordDecoder);
                            System.out.println(count++ +":"+record);
                        }
                    }
                }
            }
        }
    }

    private SpecificDatumReader getAvroReader(String schema_filename) throws Exception
    {
        File schemaFile = new File(schema_filename);
        if(schemaFile.exists() && !schemaFile.isDirectory())
        {
            Schema schema = new Schema.Parser().parse(schemaFile);
            SpecificDatumReader avroReader = new SpecificDatumReader(schema);
            return avroReader;
        }
        return null;
    }

    public long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
    {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
          kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError())
        {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    public static void main(String[] args)
    {
        SimpleKafkaConsumer readKafka = new SimpleKafkaConsumer();
        String topic_name = "Samples_PosDataStream";
        String schema_filename = "./Platform/conf/Samples_PosDataStream_schema.avsc";
        try
 {
            readKafka.read(topic_name, schema_filename, "localhost", 9092, "ItsAUniqueClient");
        } catch (Exception e)
        {
            System.out.println(e);
        }
    }
}