Skip to main content

CREATE STREAM

CREATE STREAM <name> OF <type name> 
[ PARTITION BY { <field name>, ... | <expression> } ]
[ PERSIST USING <property set> ]
[ GRACE PERIOD <integer> { SECOND | MINUTE | HOUR | DAY } ON <field name> ];

Creates a stream using a previously defined type.

Note

A stream and its type may also be created as part of a CREATE CQ declaration. See Parsing the data field of WAEvent for an example.

For example, PosApp defines the MerchantTxRate type and creates a stream based on that type:

CREATE TYPE MerchantTxRate(
  merchantId String KEY,
  zip String,
  startTime DateTime,
  count integer,
  totalAmount double,
  hourlyAve integer,
  upperLimit double,
  lowerLimit double,
  category String,
  status String
);
CREATE STREAM MerchantTxRateOnlyStream OF MerchantTxRate PARTITION BY merchantId;

GRACE PERIOD should be used when data may be received out of order. For example, if you were collecting log data from servers all over the world, network latency might result in events with a logTime value of 1427890320 arriving from a nearby server before events with a timestamp of 1427890319 (one second earlier) from a server on another continent. If you knew that the maximum latency was two seconds, you could use the clause GRACE PERIOD 2 SECOND ON logTime to ensure that all events are processed in order. The events of the stream would then be buffered for two seconds, continuously sorted based on the logTime value, and passed to the next component in the application. If the time interval is too short, any out-of-order events received too late to be sorted in the correct order are discarded. Without the GRACE PERIOD option, out-of-order events are processed as they arrive, which may result in incorrect calculations.

For example, this sample sorter for out of order events is paired with a stream created with a 2 second grace period on the log time:

CREATE SORTER MySorter OVER
Stream1 ON logTime OUTPUT TO Stream1Sorted,
Stream2 ON logTime OUTPUT TO Stream2Sorted
WITHIN 2 second
OUTPUT ERRORS TO fooErrorStream;
 
CREATE STREAM EventStream1 OF EventType1 GRACE PERIOD 2 SECOND ON logTime;

Persisting a stream to Kafka

Note

Persistent streams are available as an add-on for Striim Cloud Enterprise (see Add-on features) and by default in Striim Cloud Mission Critical.

For an overview of this feature, see Kafka-persisted streams.

Note

Kafka and Zookeeper must be running when you create a Kafka-persisted stream, persist an existing stream, or import an application containing one.

CREATE STREAM <name> OF <type> PERSIST [USING <property set>];

To enable replay of a stream by persisting it to Kafka (see Replaying events using Kafka streams), use the syntax CREATE STREAM <name> OF <type> USING <namespace>.<property set>, where <property set> is the name of a set of Kafka server properties. To persist to a Striim cluster's integrated Kafka broker, use the property set Global.DefaultKafkaProperties, for example:

CREATE STREAM MyStream of MyStreamType PERSIST USING Global.DefaultKafkaProperties;

This memory-resident stream may be used in the usual way in a window or CQ. Alternatively, the persisted data may be read by KafkaReader using topic name <namespace>_<stream name> (see Reading a Kafka stream with KafkaReader). To use persisted stream data from the integrated Kafka broker outside of Striim, see Reading a Kafka stream with an external Kafka consumer.

If a persisted stream is created in an application or flow with encryption enabled (see CREATE APPLICATION ... END APPLICATION) it will be encrypted. It may be read by another application without encryption enabled.CREATE APPLICATION ... END APPLICATION

Limitations:

  • Kafka streams may be used only on the output of a source or the output of a CQ that parses a source.

  • Implicit streams may not be persisted to Kafka.

  • In an application or flow running in a Forwarding Agent, a source or CQ may output to a Kafka stream, but any further processing of that stream must take place on the Striim server.

  • If the Kafka broker configuration delete.topic.enable is false (the default for Kafka 0.11 and all other releases prior to 1.0.0), when a Striim application containing a Kafka stream has been terminated and you drop the application, when you reload the application, creating the stream will fail. To avoid this, set delete.topic.enable=true.

Thus the Kafka stream must be explicitly created before the source or CQ that populates it. Using MultiLogApp for example, to persist the raw output of the access log source:

CREATE STREAM RawAccessStream OF Global.WAEvent
  PERSIST USING Global.DefaultKafkaProperties;

CREATE SOURCE AccessLogSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'access_log',
  positionByEOF:false
)
PARSE USING DSVParser (
  ignoreemptycolumn:'Yes',
  quoteset:'[]~"',
  separator:'~'
)
OUTPUT TO RawAccessStream;

Alternatively, to persist the output of the CQ that parses that raw output:

CREATE TYPE AccessLogEntry (
    srcIp String KEY ...
);
CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties;

CREATE CQ ParseAccessLog 
INSERT INTO AccessStream
SELECT data[0] ...
FROM RawAccessStream;

To distribute events among multiple Kafka partitions, use PARTITION BY <field>:

CREATE STREAM AccessStream OF AccessLogEntry
  PERSIST USING Global.DefaultKafkaProperties
  PARTITION BY srcIp;

All events with the same value in <field> will be written to the same randomly selected Kafka partition. Striim will distribute the data evenly among the partitions to the extent allowed by the frequency of the <field> values. For example, if 80% of the events have the same <field> value, then one of the Kafka partitions will contain 80% of the events.

By default, events may be distributed among up to 200 Kafka partitions.

Dropping a persisted stream will automatically delete the associated Kafka topics.

If recovery (see Recovering applications) is enabled for an application containing a Kafka stream, the persisted data will include "CheckPoint" events used by the recovery process.