Reading a Kafka stream with an external Kafka consumer
To read a Kafka stream's persisted data in an external application:
Include
dataformat:'avro'
in the stream's property set.Generate an Avro schema file for the stream using the console command
EXPORT <namespace>.<stream name>
. This will create a schema file<namespace>_<stream name>_schema.avsc
in the Striim program directory. Optionally, you may specify a path or file name usingEXPORT <namespace>.<stream name> '<path>' '<file name>'
. The .avsc extension will be added automatically.Copy the schema file to a location accessible by the external application.
In the external application, use the Zookeeper and broker addresses in the stream's property set, and reference the stream using
<namespace>_<stream name>
.
Note
Recovery (see Recovering applications) is not supported for Avro-formatted Kafka streams.
For example, the following program would read from Samples.PosDataStream:
import java.io.File; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; public class SimpleKafkaConsumer { /** * This method issues exactly one fetch request to a kafka topic/partition and prints out * all the data from the response of the fetch request. * @param topic_name Topic to fetch messages from * @param schema_filename Avro schema file used for deserializing the messages * @param host_name Host where the kafka broker is running * @param port Port on which the kafka broker is listening * @param clientId Unique id of a client doing the fetch request * @throws Exception */ public void read(String topic_name, String schema_filename, String host_name, int port, String clientId) throws Exception { SimpleConsumer simpleConsumer = new SimpleConsumer(host_name, port, 100000, 64 * 1024, clientId); // This is just an example to read from partition 1 of a topic. int partitionId = 1; // Finds the first offset in the logs and starts fetching messages from that offset. long offset = getOffset(simpleConsumer, topic_name, partitionId, kafka.api.OffsetRequest.EarliestTime(), clientId); // Builds a fetch request. FetchRequestBuilder builder = new FetchRequestBuilder(); builder.clientId(clientId); builder.addFetch(topic_name, partitionId, offset, 43264200); FetchRequest fetchRequest = builder.build(); // Get the response of the fetch request. FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest); // Instantiates an avro deserializer based on the schema file. SpecificDatumReader datumReader = getAvroReader(schema_filename); if (fetchResponse.hasError()) { System.out.println("Error processing fetch request: Reason -> "+fetchResponse.errorCode(topic_name, 1)); } else { ByteBufferMessageSet bbms = fetchResponse.messageSet(topic_name, partitionId); int count = 0; for (MessageAndOffset messageAndOffset : bbms) { ByteBuffer payload = messageAndOffset.message().payload(); { // The message format is Striim specific and it looks like : // 1. First 4 bytes represent an integer(k) which tells the size of the actual message in the byte buffer. // 2. Next 'k' bytes stores the actual message. // 3. This logic runs in a while loop until the byte buffer limit. while (payload.hasRemaining()) { int size = payload.getInt(); // if the size is invalid, or if there aren't enough bytes to process this chunk, then just bail. if ((payload.position() + size > payload.limit()) || size == 0) { break; } else { byte[] current_bytes = new byte[size]; payload.get(current_bytes, 0, size); BinaryDecoder recordDecoder = DecoderFactory.get().binaryDecoder(current_bytes, null); GenericData.Record record = (GenericData.Record) datumReader.read(null, recordDecoder); System.out.println(count++ +":"+record); } } } } } } private SpecificDatumReader getAvroReader(String schema_filename) throws Exception { File schemaFile = new File(schema_filename); if(schemaFile.exists() && !schemaFile.isDirectory()) { Schema schema = new Schema.Parser().parse(schemaFile); SpecificDatumReader avroReader = new SpecificDatumReader(schema); return avroReader; } return null; } public long getOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } public static void main(String[] args) { SimpleKafkaConsumer readKafka = new SimpleKafkaConsumer(); String topic_name = "Samples_PosDataStream"; String schema_filename = "./Platform/conf/Samples_PosDataStream_schema.avsc"; try { readKafka.read(topic_name, schema_filename, "localhost", 9092, "ItsAUniqueClient"); } catch (Exception e) { System.out.println(e); } } }