Skip to main content

Using custom Java functions

TQL applications may import Java functions. The basic procedure is:

  • write a custom function

  • compile it to a .jar

  • add the .jar to .../Striim/lib

  • load the function into Striim

  • import the function to an application

Functions are loaded at the cluster level, so your new function would be available for import to any application in any namespace. Imported functions required by an application are included as IMPORT statements in exported TQL.

The rest of this section describes in detail how to create and use custom functions.

Importing Striim JAR files into your IDE

To develop your own custom functions, you must copy the JAR files from Striim's library into your IDE. The following instructions are for Eclipse. If you use another ID, the procedure should be similar.

  1. Copy the Striim JAR files, available in Striim/lib, into your Java project so you can reference them in your code.

  2. In Eclipse, import the External Jars during the Java Project creation. Otherwise ensure the Project Name is selected in the Project Explorer, select File > Properties, select Java Build Path > Libraries, click the Libraries tab, and click Add External JARS.

Setting up packages and classes

The next step is to set up the following in your Java IDE:

  • A package to contain your custom abstract classes.

  • Import statements for required classes.

  • Abstract classes that will contain your custom functions.

  • A logger (recommended).

Caution

Do not use reserved keywords in your package name (see List of reserved keywords).List of reserved keywords

The following template will help you get started:

// You will need your own package name:
package com.mycompany.custom.packagename;  

 
// You'll need to reference this is a Striim class in your own code if you
// define an Aggregate Window function:
import com.webaction.runtime.compiler.custom.AggHandlerDesc;

// Logging classes required by Striim  
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
  
// Any other classes you may need for your development
import java.lang.Math;
import java.util.LinkedList;
import org.joda.time.DateTime;

// Abstract class needed to contain your custom functions
public abstract class MyCustomFunctions {

  // Required logger
  private static Logger logger = Logger.getLogger( MyCustomFunctions.class );
}

You may download a sample MyCustomFunctions.java as MyCustomFunctions.zip from https://github.com/striim/doc-downloads.

Developing single-row functions

Single row functions are the simplest types of Striim functions. Start by defining your Java methods inside the custom abstract class you created (see Setting up packages and classes). A best practice is to use techniques that operate on a row of values by iterating on them without storing them, thereby reducing memory requirements and increasing performance.

For example, the following function iterates through a row of values and keeps track of the highest value. Once its loop has reached the termination condition by reaching the end of the row of values, it returns the highest value of that row:

public static double maxSingleRow(double... n) {
  int    i=0;
  double runningmax = n[i];
 
  // Iterate through the row and continually update runningmax to the highest value
  while (++i < n.length)
    if (n[i] > runningmax)
      runningmax = n[i];
 
  return runningmax;
}

Generate your custom JAR and deploy it to your Striim environment (see Setting up packages and classes).

Now you can import your static custom class into TQL and use the newly created custom function:

USE StriimWindows_NS2;
DROP APPLICATION StriimWindows2 CASCADE;
 
IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*;
 
CREATE APPLICATION StriimWindows2;
 
CREATE OR REPLACE CQ myStream_CQ
INSERT INTO myStream_ST
SELECT
      maxSingleRow( field1, field2, field3, field4 )
FROM
      inputStream_ST;

Here are a few other single row functions you can include by defining the following methods in the same abstract class:

  • minSingleRow iterates through a row of values and keeps track of the lowest value

  • rangeSingleRow calculates the range of values in the row by using the previously created minSingleRow and maxSingleRow functions

  • avgSingleRow iterates through a row of values and calculates the average by maintaining a running total and dividing by the length

	public static double minSingleRow(double... n) {
	    int i = 0;
	    double runningmin = n[i];
	    while (++i < n.length)
	        if (n[i] < runningmin)
	        	runningmin = n[i];
	    return runningmin;
	}	
	
	public static double rangeSingleRow(double... n) {
	    double dRange = maxSingleRow( n ) - minSingleRow( n );
    	return dRange;
	}	

	public static double avgSingleRow(double... n) {
	    int i = 0;
	    double runningsum = 0;
	    while (i < n.length) {
	        runningsum += n[i];
	        i++;
	    }
	    return ( runningsum / i );
	}	

Developing multi-row functions

Multi-row functions operate across multiple rows in a Window. A best practice when designing multi-row functions is to use accumulators that eliminate the need to iterate through all the rows in a Window. As you add and eliminate rows from a Window, maintain state on the fly. While it is still possible to iterate through all the rows, the approach using accumulators and maintaining state will help improve performance.

To design a class supporting multi-row functions:

  1. Ensure that you've imported the necessary Class to declare an aggregate function.

    import com.webaction.runtime.compiler.custom.AggHandlerDesc;
  2. Declare a handler that defines a Java class used to create the TQL function signature supporting the required accumulators and functions. For example:

    @AggHandlerDesc(handler=AvgDebug_Float.class)
  3. Declare an abstract static method specifying the expected arguments and return type for the multi-row function. The Java signature will be converted to an equivalent TQL signature. For example:

        public abstract Float AvgDebug( String sKey, Float fArg );
  4. Define a public static nested class whose name is identical to the one you specified in your handler. After setting its initialization variables, your class must implement the following methods:

    • decAggValue()

    • incAggValue()

    • getAggValue()

The following example illustrates these basic steps in a class that maintains a running sum and can return the average of all floating point values through its getAggValue() function. The incAggValue() function adds a new value to the running sum, and decAggValue() removes a value from the running sum:

    // Required handler declaration:
    // handler name must match the name of your static class. 
    @AggHandlerDesc(handler=AvgDebug_Float.class)

    // Name of the function to be exported to TQL:
    public abstract Float AvgDebug( String sKey, Float fArg );

    // Handler class implementing the required accumulators:
    // must match the name in the handler declaration.
    public static class AvgDebug_Float
    {
        // initializers
        float fRunningSum = 0;
        int iRunningCount = 0;
        
        // Calculates and returns the average value at any moment
        public Float getAggValue()
        {
            logger.debug( "RunningSum: " + fRunningSum + ",
              RunningCount: " + iRunningCount + "\n");
            return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount);
        }
        // Adds the new value to the running total, maintaining the count
        // of values included:
        public void incAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount++;
                fRunningSum += fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "]
              RunningSum is now: " + fRunningSum + ",  RunningCount is now: "
              + iRunningCount );
        }

        // Removes the specified value from the running total, maintaining
        // the count of values included:
        public void decAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount--;
                fRunningSum -= fArg;
            }
            logger.debug( "[Key: " + sKey + ", Value: " +  fArg + "]
              RunningSum is now: " + fRunningSum + ",  RunningCount is now: " 
              + iRunningCount );
        }
    }

The following TQL example illustrates the mapping between the Java code shown above and its equivalent class and function once deployed in the Striim environment:

USE StriimWindows_NS2;
DROP APPLICATION StriimWindows2 CASCADE;
 
IMPORT STATIC com.mycompany.custom.packagename.MyCustomFunctions.*;
 
CREATE APPLICATION StriimWindows2;
 
CREATE OR REPLACE CQ MovingAvg_CQ
INSERT INTO MovingAvg_ST
SELECT
     avgDebug( OrderId, OrderValue )
FROM
     MovingAvg3ES_WN;

The following Java code implements a handler for an imported TQL function called LastBut, which returns the string located one position before the last specified index:

	@AggHandlerDesc(handler=LastBut_String.class)
    public abstract String LastBut(String arg, int idx);
    public static class LastBut_String
    {
        LinkedList<String> list = new LinkedList<String>();
        int i;
        
        // Returns the string located in the position just before 
        // the last specified index:
        public String getAggValue() 
        { 
            if( list.isEmpty()  || (i < 0) || (i > (list.size()-1)) ) { 
                return null;
            } else {
                return list.get( ( list.size()-i-1) );
            }
        }
 
        // Adds a string to the end of the linked list, 
        //updating the index location:
        public void incAggValue(String arg, int idx) 
        { 
            list.addLast(arg);
            i = idx;  // no safety logic required here - 
                      // all handled by the getAggValue() method
        }


        // Removes the string located at the head of the linked list,
        // assuming it is not an empty list:
        public void decAggValue(String arg, int idx) 
        { 
            if(!list.isEmpty()) list.removeFirst();
        }
    }

Based on the Java code specified in the example above, the TQL class would be called LastBut_String, and the custom method that you could use in your TQL would be called LastBut. Based on this pattern, you can create a collection of functions that take advantage of the Java LinkedList template class, substituting other types for String, such as Byte, Short, Integer, Long, Float, Double, Boolean, DateTime, and even Object. For example, the following example extends the pattern to a linked list of Byte objects:

    @AggHandlerDesc(handler=LastBut_Byte.class)
    public abstract Byte LastBut(Byte arg, int idx);
    public static class LastBut_Byte
    {
        LinkedList<Byte> list = new LinkedList<Byte>();
        int i;
        
        public Byte getAggValue() 
        { 
            if( list.isEmpty()  || (i < 0) || (i > (list.size()-1)) ) { 
                return null;
            } else {
                return list.get( ( list.size()-i-1) );
            }
        }
        public void incAggValue(Byte arg, int idx) 
        { 
            list.addLast(arg);
            i = idx;  // no safety logic required here - 
                      // all handled by the getAggValue() method
        }
        public void decAggValue(Byte arg, int idx) 
        { 
            if(!list.isEmpty()) list.removeFirst();
        }
    }

Understanding windows in custom functions

As you work with multiple rows of data, Striim provides you with the support you need to use sliding windows of data. To understand how this works, consider the following complete dataset:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

3333

3/1/2016

333.33

4444

4/1/2016

444.44

5555

5/1/2016

555.55

6666

6/1/2016

666.66

7777

7/1/2016

777.77

8888

8/1/2016

888.88

9999

9/1/2016

999.99

The following describes how the dataset is treated in a sliding window in which up to 3 rows of data at a time are included.

Initialization

When your object is initialized, you do not yet have a working record (a row of data), and the window has no data yet. Consider the following TQL statement:

SELECT
  AVG(OrderValue),
  OrderID,
  OrderDate,
  OrderValue
FROM
  STREAM_ST

As you learned in Developing multi-row functions, the getAggValue() function defines return values, the incAggValue() function defines incremental logic, and decAggValue() defines decremental logic.

At the time of initialization, the running sum and count for the data would each be zero.

First row

Using the dataset above, the incAggValue() function adds the first row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

Sum: 111.11

Count: 1

AVG: 111.11

Second row

Using the dataset above, the incAggValue() function adds the second row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

2222

2/1/2016

222.22

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

Sum: 333.33

Count: 2

AVG: 166.65

Third row

Using the dataset above, the incAggValue() function adds the third row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

3333

3/1/2016

333.33

The window is:

OrderID

OrderDate

OrderValue

1111

1/1/2016

111.11

2222

2/1/2016

222.22

3333

3/1/2016

333.33

Sum: 666.66

Count: 3

AVG: 222.22

Fourth row

Up till now, since the sliding window could contain up to 3 rows, for each new row the incAggValue() function added the new row to the window, and the getAggValue() function was called and added those values to the running sum and count. With a 4th row, this algorithm changes. The decAggValue() function is called to remove the 1st row, the incAggValue() function adds the 4th row to the window, and the getAggValue() function is called and adds those values to the running sum and count.

Here is the state of the data at this point. The working record is:

OrderID

OrderDate

OrderValue

4444

4/1/2016

444.44

The window now only includes rows 2-4 and excludes row 1, and the count holds steady at 3 rows:

OrderID

OrderDate

OrderValue

2222

2/1/2016

222.22

3333

3/1/2016

333.33

4444

4/1/2016

444.44

Sum: 999.99

Count: 3

AVG: 333.33

Toward the end

The trend continues as new rows are added: a row is dropped in order to add the new row, and the values are updated. Eventually, as input events cease and windowed events being to timeout, only the decAggValue() and getAggValue()functions are called.

Understanding logging in custom functions

We recommend that you use org.apache.logging.log4j ("Log4j2") for your logging, as that is what Striim uses with its own functions.

Configuring Log4j

Edit the conf/log4j.server.properties file and add the following new section:

log4j.logger.com.mycompany.custom.packagename.MyCustomFunctions=debug, MyCustomFunctionsAppender
log4j.additivity.com.mycompany.custom.packagename.MyCustomFunctions=false
log4j.appender.MyCustomFunctionsAppender=org.apache.log4j.FileAppender
log4j.appender.MyCustomFunctionsAppender.File=logs/MyCustomFunctions.log.out
# Define the layout for file appender
log4j.appender.MyCustomFunctionsAppender.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.MyCustomFunctionsAppender.layout.conversionPattern=%-6p:%-15M:%m%n

In this example, a separate logs/MyCustomFunctions.log.out file will be created. Its log level will be debug, and it will output the log level, method, and message. Some code examples for good log entries:

  • logger.trace("execute() started");

  • logger.debug(COMMANDS, "{} received {}", name, param);

  • logger.debug( "sending encrypted request, length = {}\n{}", () -> fullEncryptedRequest.length, () -> Arrays.toString(fullEncryptedRequest) );

In the following Java class, the logger is used to output the running sum and count values:

    @AggHandlerDesc(handler=AvgDebug_Float.class)
    public abstract Float AvgDebug( String sKey, Float fArg );
    public static class AvgDebug_Float
    {
        float fRunningSum = 0;
        int iRunningCount = 0;
        
        public Float getAggValue()
        {
            logger.debug( "RunningSum: {},  RunningCount: {}\n", fRunningSum, iRunningCount);
            return (iRunningCount == 0) ? 0 : (fRunningSum / iRunningCount);
        }
        public void incAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount++;
                fRunningSum += fArg;
            }
            logger.debug( "[Key: {}, Value: {}] RunningSum is now: {},  RunningCount is now: {}",
              sKey, fArg, fRunningSum, iRunningCount);
        }
        public void decAggValue(String sKey, Float fArg)
        {
            if(fArg != null) {
            	iRunningCount--;
                fRunningSum -= fArg;
            }
            logger.debug( "[Key: {}, Value: {}] RunningSum is now: {},  RunningCount is now: {}",
              sKey, fArg, fRunningSum, iRunningCount);
        }
    }

One recommendation for your logging applications is to use StringBuffer to concatenate the strings used in your logging output. This will increase efficiency and ensure greater security in your applications.

Set the log level

Striim supports dynamic changing of log levels without the need to restart the server. To dynamically set the log level to debug for your custom class:

set loglevel = {com.mycompany.custom.packagename.MyCustomFunctions : debug};

See Changing the log level for more information.

Calling a shell script from a custom Java function

The following custom function will call a shell script specified in TQL.

import com.webaction.runtime.compiler.custom.Nondeterministic; 
...

@Nondeterministic
public static int kickShell (String args) {
        int ret = 0;
        try {
            if (args.indexOf(".sh") == -1) {
                System.out.println("Error:" + args + " is not shell.");
                return ret;
            }
            Process process = new ProcessBuilder("sh", args).start();
            ret = process.waitFor();
            System.out.println("return:" + ret);
        }catch (ArrayIndexOutOfBoundsException e) {
            System.out.println("Info: Please specify one argument.");
            System.out.println("return:" + ret);
        }catch (Exception e){
            System.out.println("return:" + ret);
            e.printStackTrace();
        }finally{
            return ret;
        }
    }

You could use that in TQL as follows:

CREATE CQ ...
SELECT kickShell('/etc/striim/test.sh') as retVal ...

import com.webaction.runtime.compiler.custom.Nondeterministic; and @Nondeterministic cause the script to be executed every time the CQ is run. Without those lines, the script would be called only once, on deployment.

Warning

Consider the potential security issues of allowing TQL applications to call scripts on the Striim host system. Depending on your environment and requirements, it may be more appropriate to call a specific script from the custom Java function.

Loading and unloading custom functions

Functions are loaded at the cluster level, so are available for import to any application in any namespace. Imported functions are included as IMPORT statements in exported TQL. The LOAD and UNLOAD commands require the Global.admin role.

Before loading your custom function, copy its .jar file to striim/lib. Whenever Striim is restarted, the function will be loaded automatically.

To load the function using the console, enter LOAD "lib/<file name>.jar";

To import a loaded function in the console or a TQL file, use IMPORT STATIC <package name>.

To load and import a custom function in the web UI, go to the Flow Designer and select Configuration > App Settings > Java Package > Add Another Package, enter IMPORT STATIC <package name> in the Java Package field, and click Save.

To unload a custom function, in the console enter UNLOAD "lib/<file name>.jar";

To stop the function from reloading when Striim is restarted, delete its .jar file from striim/lib.