Skip to main content

Striim Cloud 4.1.0 documentation

Parquet Parser

Apache Parquet is an open source columnar data file format designed for efficient data storage and retrieval. Parquet is built for complex nested data structures, and uses a record shredding and assembly algorithm. Parquet provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk. The benefits of Parquet format include:

  • Fast queries that can fetch specific column values without reading full row data

  • Highly efficient column-wise compression

  • High compatibility with online analytical processing (OLAP)

Parquet is a popular format for data serialization in file systems that are often used with analytical data engines. Amazon S3 and many other cloud services support Parquet. It is good for queries that read particular columns from a wide (many column) table because only the needed columns are read, minimizing I/O operations.

For more information on Parquet, see the Apache Parquet documentation.

In this release, the Parquet Parser allows you to read Parquet-formatted files using FileReader, HDFSReader, and S3Reader. These sources correspond to files sources on a local filesystem, on a Hadoop Distributed File System (HDFS), or on Amazon S3. You can read both compressed and uncompressed files using the Parquet Parser without needing any configuration.

The output stream type of a source using Parquet Parser is ParquetEvent. See Writers overview for writers that can accept such a stream as input. When a writer's input is a ParquetEvent stream, it must use Avro Formatter or Parquet Formatter.

Example: configuring a Striim application with HDFSReader and Parquet Parser

Suppose that you have Parquet files in the following Hadoop directory path: "/data/parquet/files/".

You can create a Striim application with the HDFSReader and configure its details along with a Parquet Parser. For example:

CREATE SOURCE hadoopSource USING HDFSReader (
  hadoopconfigurationpath: 'IntegrationTests/TestData/hdfsconf/',
  hadoopurl: 'hdfs://dockerhost:9000/',
  directory: '/data/parquet/files/',
  wildcard: '*'
)
PARSE USING ParquetParser()
OUTPUT TO parquetStream;
Properties

property

type

default value

notes

RetryWait

String

1m

A time interval that specifies the wait between two retry attempts. With the default value 1m, the wait is one minute. Acceptable units of intervals: s,m,h,d. For example: RetryWait:'30s'.

Note that if the parser encounters a non-Parquet file or an incomplete Parquet file this means there may be a pause of three minutes before it skips the file and goes on to the next one. A list of skipped files is available in the Monitoring display and inMON output for the reader.

ParquetEvent

The ParquetEvent for the example Striim application has the following structure:

data of type org.apache.avro.GenericRecord
metadata of type Map
userdata of type Map

The data carries one row read from the Parquet file defined as an Avro GenericRecord.

The additional metadata that is appended along with source metadata are as follows:

  • BlockIndex: The block number which the record belongs to.

  • RecordIndex: The record number in a block.

You can use the userdata map to add user-defined information in a key-value format.

Example 1. User data sample
CREATE SOURCE fileSrc USING FileReader (
  directory:'/Users/Downloads/pems_sorted/',
  WildCard:'part-r-00282.snappy.parquet',
  positionByEOF:false
)
PARSE USING ParquetParser()
OUTPUT TO ParquetStreams;
 
CREATE STREAM CQStream1 of Global.parquetevent;
 
CREATE CQ CQ1
INSERT INTO CQStream1
SELECT PUTUSERDATA(
  s,
  'schemaName',
   s.data.getSchema().getName() 
)
FROM ParquetStreams s;
 
CREATE STREAM CQStream2 of Global.parquetevent;
 
CREATE CQ CQ2
INSERT INTO CQStream2
SELECT PUTUSERDATA(
  s2,
  'schemaNameExtended', 
  Userdata(s2,
  'schemaName').toString().concat(".PQ")
)
FROM CQStream1 s2;
 
CREATE TARGET t2 USING FileWriter(
  filename:'fileParquetTest',
  directory:'striim/%@userdata(schemaNameExtended)%',
  rolloverpolicy:'eventcount:100'
)
FORMAT USING ParquetFormatter ( schemafilename:'schemaPQ')
INPUT FROM CQStream2;


Compatible targets

The ParquetEvent can be handled directly in targets writing to a file or streaming into Apache Kafka. You can use file targets with ParquetFormatter or AvroFormatter with a dynamic directory configuration. When Apache Kafka is the target, the AvroFormatter is the only supported formatter and you must configure a schema registry.

Example 2. TQL sample: file target
create stream newParquetStream of Global.parquetevent;
CREATE CQ CQ1 
INSERT INTO newParquetStream 
SELECT PUTUSERDATA(
  s,
  'schemaName',
  s.data.getSchema().getName())
FROM parquetStream s;
 
CREATE TARGET fileTgt USING Global.FileWriter (
  filename: 'avroFiles',
  directory: 'avrodir/%@userdata(schemaName)%',
  rolloverpolicy: 'EventCount:1000,Interval:30s'
)
FORMAT USING Global.AvroFormatter  (
  schemaFileName: 'schemaAvro.avsc',
  formatAs: 'default' )
INPUT FROM newParquetStream;


Example 3. TQL sample: Kafka target
CREATE OR REPLACE TARGET kafkaTgt USING Global.KafkaWriter VERSION '0.11.0'(
  brokerAddress: 'localhost:9092',
  Topic: 'PqTopic1',
  Mode: 'Async' )
FORMAT USING Global.AvroFormatter  (
  schemaregistryurl: 'http://localhost:8081/',
  FormatAs: 'Default'
)
INPUT FROM parquetStream;