Skip to main content

Using Striim Open Processors

Creating an open processor component

A Striim open processor contains a custom Java application that reads data from a window or stream, processes it, optionally enriching it with data from a cache, and writes to an output stream.

The SDK, which you may download from github.com/striim/doc-downloads includes the following:

The component must be built with Maven, since it requires the Maven Shade Plugin.

An open processor can be used only in the Striim namespace from which the types are exported.

The following simple example shows all the steps required to create an open processor and use it in a Striim application.

Step 1: define the input and output streams in Striim

The following TQL defines the input and output streams for the example open processor you will add later. It includes a FileWriter source, a cache that will be specified in the open processor's ENRICH option, and a FileWriter target.

CREATE NAMESPACE ns1;
USE ns1;
CREATE APPLICATION OPExample;

CREATE source CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'PosDataPreview.csv',
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;
 
CREATE TYPE MerchantHourlyAve(
  merchantId String,
  hourValue integer,
  hourlyAve integer
);

CREATE CACHE HourlyAveLookup using FileReader (
  directory: 'Samples/PosApp/appData',
  wildcard: 'hourlyData.txt'
)
PARSE USING DSVParser (
  header: Yes,
  trimquote:false,
  trimwhitespace:true
) 
QUERY (keytomap:'merchantId') 
OF MerchantHourlyAve;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_INT(data[9]) as zip
FROM CsvStream;
 
CREATE CQ cq2
INSERT INTO SendToOPStream
SELECT makeList(dateTime) as dateTime,
  makeList(zip) as zip
FROM PosDataStream;
 
CREATE TYPE ReturnFromOPStream_Type ( time DateTime , val Integer );
CREATE STREAM ReturnFromOPStream OF ReturnFromOPStream_Type;

CREATE TARGET OPExampleTarget 
USING FileWriter (filename: 'OPExampleOut') 
FORMAT USING JSONFormatter() 
INPUT FROM ReturnFromOPStream;
 
END APPLICATION OPExample;

Step 2: export the input and output stream types

If you create OPExample in the ns1 workspace, the following Striim console command will export the types from the application to UploadedFiles/OpExampleTypes.jar:

EXPORT TYPES OF ns1.OPExample TO "UploadedFiles/OpExampleTypes.jar";

The EXPORT TYPES command requires read permission on the namespace. See Manage Striim - Files for instructions on downloading OpExampleTypes.jar.

Step 3: set up Maven

Install the SDK and exported types .jar files:

mvn install:install-file -DgroupId=com.example -DartifactId=OpenProcessorSDK \
  -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar -Dfile=/opt/striim/StriimSDK/StriimOpenProcessor-SDK.jar
mvn install:install-file -DgroupId=com.example -DartifactId=OPExample -Dversion=1.0.0-SNAPSHOT \
  -Dpackaging=jar -Dfile=/home/myhome/OpExampleTypes.jar

Create a Maven project in which you will create your custom Java application:

mvn archetype:generate -DgroupId=com.example.opexample -DartifactId=opexample \
  -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Replace the default pom.xml created by Maven with the following, adjusting as necessary for your environment:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.opexample</groupId>
  <artifactId>opexample</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>opexample</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
<!-- OpenProcessorSDK jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OpenProcessorSDK</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
<!-- exported types jar -->
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>OPExample</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
  </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-release-plugin</artifactId>
                <version>2.2.2</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
<!-- 
The output SCM filename is defined here.
-->
                    <finalName>OPExample.scm</finalName>
                    <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <manifestEntries>
                                <Striim-Module-Name>OPExample</Striim-Module-Name>
                                <Striim-Service-Interface>
                                  com.webaction.runtime.components.openprocessor.StriimOpenProcessor
                                </Striim-Service-Interface>
                                <Striim-Service-Implementation>
                                  com.example.opexample.App
                                </Striim-Service-Implementation>
                            </manifestEntries>
                        </transformer>
                    </transformers>
                    <artifactSet>
                        <excludes>
                            <exclude>org.slf4j:*</exclude>
                            <exlcude>log4j:*</exlcude>
                        </excludes>
                    </artifactSet>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>com.coderplus.maven.plugins</groupId>
                <artifactId>copy-rename-maven-plugin</artifactId>
                <version>1.0</version>
                <executions>
                    <execution>
                        <id>copy-file</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy</goal>
                        </goals>
<!--
The location and name for the .scm file to be imported into Striim is defined here.
Preferred location is module/modules folder under the Maven project main folder.
-->
                        <configuration>
    <sourceFile>/home/myhome/opexample/target/OpExample.scm.jar</sourceFile>
    <destinationFile>/home/myhome/opexample/modules/OpExample.scm</destinationFile>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Step 4: write your Java application and build the .scm

Replace the default App.java with the following:

package com.example.opexample;
 
import wa.ns1.SendToOPStream_Type_1_0;
import wa.ns1.ReturnFromOPStream_Type_1_0;
 
import com.webaction.anno.PropertyTemplateProperty;
import com.webaction.runtime.components.openprocessor.StriimOpenProcessor;
import org.joda.time.DateTime;
 
import com.webaction.anno.AdapterType;
import com.webaction.anno.PropertyTemplate;
import com.webaction.runtime.containers.WAEvent;
 
import com.webaction.runtime.containers.IBatch;
import java.util.*;
 
@PropertyTemplate(name = "TupleConverter", type = AdapterType.process,
properties = {
@PropertyTemplateProperty(name="ahead", type=Integer.class, required=true, defaultValue="0"),
@PropertyTemplateProperty(name="lastItemSeen", type=Boolean.class, required=true, defaultValue="0")
},
// The names of outputType and inputType are relative to Striim: output from a native Striim
// code to your custom component, and input from your custom component to a native component.
outputType = SendToOPStream_Type_1_0.class,
inputType = ReturnFromOPStream_Type_1_0.class
)
public class App extends StriimOpenProcessor
{
  
public void run() {
		IBatch<WAEvent> event = getAdded();
		Iterator<WAEvent> it = event.iterator();
		while (it.hasNext()) {
			SendToOPStream_Type_1_0  type = (SendToOPStream_Type_1_0 ) it.next().data;
			//  ... Additional operations
		}

		ReturnFromOPStream_Type_1_0 ReturnFromOPStream_Type_1_0  = new ReturnFromOPStream_Type_1_0 ();
		ReturnFromOPStream_Type_1_0.time = DateTime.now();
		Random rand = new Random(System.currentTimeMillis());

		ReturnFromOPStream_Type_1_0.val= rand.nextInt(50) + 1;
		send(ReturnFromOPStream_Type_1_0 );

	}

public void close() throws Exception {
        // TODO Auto-generated method stub
 
    }
 
    public Map getAggVec() {
        // TODO Auto-generated method stub
        return null;
    }
 
    public void setAggVec(Map aggVec) {
        // TODO Auto-generated method stub
 
    }
}

Change to the opexample directory created by Maven and enter mvn package.

Step 5: import the .scm into Striim

˛Loading an open process˛or requires the Global.admin permission (see Permissions).

Copy opexample/modules/OpExample.scm to a directory accessible by the Striim server, then use the following console command to load it:

LOAD OPEN PROCESSOR "<path>/OpExample.scm";

Alternatively, you may load it in Flow Designer at Configuration > App settings > Load / unload open processor.

In either case, when the application is restarted, Striim will reload the open processor from the same location.

Step 6: add the open processor to your application

Return to the application you created in step 1, open the B˛ase Components section of the component palette, drag a Striim Open Processor into the workspace, set its options as follows, and click Save. Note that Ahead and Last Item Seen are defined by the Java class. The other properties will appear in all open processor components.

Return to the application you created in step 1, open the Base Components section of the component palette, drag a Striim Open Processor into the workspace, set its options as follows, and click Save. Note that Ahead and Last Item Seen are defined by the Java class. The other properties will appear in all open processor components.

If you run the application, it will create output files in the striim directory.

Modifying an open processor

To modify your open processor, unload it in Flow Designer at Configuration > App settings > Load / unload open processor or using the command UNLOAD OPEN PROCESSOR "<path>/<file name>.scm";. Then make changes to the Java application, compile, and load the new .scm.

Supporting recovery of open processors

To ensure that your open processors are recoverable:

  • Use a send() method that includes event positions.

  • Do not mutate the event data.

Using send() functions

Open processors publish their results to downstream components using send() functions. There are three versions and each has effects on the way the platform handles the output, especially for recovery.

function

description

send(List<WAEvent> added, List<WAEvent> removed)

This method publishes data to a downstream component as a list of added and removed elements. It is typically used to send a single event as the lone element in the added list with an empty removed list. It may also be used when processing aggregations, indicating the events added to and removed from the aggregation.

The WAEvents in the lists have positions which indicate event ordering. The position is null when recovery is not in use but must have a valid value when using recovery. The position of each event must be strictly greater than the event which comes before it -- either before it in the batch, or before it in a previous batch.

send(ITaskEvent batch)

Use this method to publish data to a downstream component as a batch. This method is commonly used by customers familiar with the Striim batching interface (added, removed, and snapshot). The ITaskEvent parameter will be directly passed to the subscribing downstream components.

The ITaskEvent contains WAEvents having positions which indicate event ordering. The position is null when recovery is not in use but must have a valid value when using recovery. The position of each event must be strictly greater than the event which comes before it, either before it in the batch, or before it in a previous batch.

send(Object o);

Use this method to publish raw Object data to a downstream component. This method is commonly used when the Open Processor emits simple values and recovery is not used. The Object parameter will be packaged as the payload in a stream event and delivered to downstream subscribing components.

The Object will be assigned the batch position of the input batch. When a batch is delivered to the OP, the batch position is calculated and saved. Subsequent uses of this method will assign the previously calculated batch position to output events.

  • If recovery is not in use, the positions are null.

  • If each input batch comprises a single event, then the batch position is equal to that event position, which is unique, therefore output events will have unique positions which are fully compatible with recovery.

  • If each input batch comprises multiple events, but this method is called only once per batch, then the batch position will be applied once so it will be unique and fully compatible with recovery.

Warning

If the input batch comprises multiple events, and if this method is called multiple times while processing that plural batch, then each of the output events will have the same position, the input batch position. The events which share a position may not be detected by recovery, which can lead to a failure of exactly once processing.

Loading and unloading open processors

The LOAD and UNLOAD commands require the Global.admin role.

Caution

If you unload an open processor, revise it, and load it again, do not change the name of the .scm file.

To load an open processor using the console, enter: LOAD OPEN PROCESSOR "<path>/<file name>.scm";

To load an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm in the Load/Unload Open Processor field, and click Load.

load_open_processor.png

To unload an open processor using the console, enter: UNLOAD "<path>/<file name>.scm";

To unload an open processor in the Flow Designer, select App Settings, enter <path>/<file name>.scm in the Load/Unload Open Processor field, and click Unload.