Skip to main content

Using Apache Flume

Striim can receive data from Apache Flume using the WebActionSink (see Apache Flume integration) as a source. Its properties are defined in configuration files on the Flume server rather than in TQL.

The WebActionSink properties are:

property

value

notes

agent.sinks.webactionSink.type

com.webaction.flume.WebActionSink

agent.sinks.webactionSink.serverUri

watp:// <ip_address > :9080

the IP address and port of the Striim server (adjust the port number if you are not using the default)

agent.sinks.webactionSink.username

the Striim login to be used by the WebActionSink

agent.sinks.webactionSink.password

the password for that login

agent.sinks.webactionSink.stream

flume:<stream name>

specify the stream name to be used in TQL (see example below)

agent.sinks.webactionSink.parser.handler

DSVParser

in this release, only DSVParser is supported

You must also specify the properties for the specified parser. See the example below.

The following example application assumes that Flume is running on the same system as Striim.

1. Perform the first two steps described in Apache Flume integration.

2. Save the following as a TQL file, then load, deploy, and start it:

CREATE APPLICATION flumeTest;
CREATE STREAM flumeStream of Global.WAEvent;
CREATE TARGET flumeOut USING SysOut(name:flumeTest) INPUT FROM flumeStream;
END APPLICATION flumeTest;

This application does not need a CREATE SOURCE statement because the data is being collected and parsed by Flume. The stream name must match the one specified in the WebActionSink properties and the type must be Global.WAEvent.

2. Save the following as waflume.conf in the flume/conf directory, replacing the two IP addresses with the test system's IP address and the username and password with the credentials you used to load the application:

# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

agent.sources = netcatSrc
agent.channels = memoryChannel
agent.sinks = webactionSink

# For each one of the sources, the type is defined
agent.sources.netcatSrc.type = netcat
agent.sources.netcatSrc.bind = 192.168.1.2
agent.sources.netcatSrc.port = 41414

# The channel can be defined as follows.
agent.sources.netcatSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.webactionSink.type = com.webaction.flume.WebActionSink
agent.sinks.webactionSink.serverUri = watp://192.168.1.2:9080
agent.sinks.webactionSink.username = flumeusr
agent.sinks.webactionSink.password = passwd
agent.sinks.webactionSink.stream = flume:flumeStream
agent.sinks.webactionSink.parser.handler = DSVParser
agent.sinks.webactionSink.parser.blocksize = 256
agent.sinks.webactionSink.parser.columndelimiter = ","
agent.sinks.webactionSink.parser.rowdelimiter = "\n" 
agent.sinks.webactionSink.parser.charset = "UTF-8"
agent.sinks.webactionSink.parser.blockAsCompleteRecord = "True"

#Specify the channel the sink should use
agent.sinks.webactionSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

3. Start Flume, specifying the configuration file:

bin/flume-ng agent --conf conf --conf-file conf/waflume.conf --name agent -
Dflume.root.logger=INFO,console

4. Save the following as flumetestdata.csv:

100,first
200,second
300,third

5. Open a terminal, change to the directory where you saved flumetestdata.csv, and enter the following command, replacing the IP address with the test system's:

cat flumetestdata.csv | nc 192.168.1.2 41414

The following output should appear in striim-node.log:

flumeTest: WAEvent{
  data: ["ID","Name"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null
};
flumeTest: WAEvent{
  data: ["100","first"]
  metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0}
  before: null
  dataPresenceBitMap: "AA=="
  beforePresenceBitMap: "AA=="
  typeUUID: null
};
...

See Parsing the data field of WAEvent for more information about this data format.