Using Apache Flume
Striim can receive data from Apache Flume using the WebActionSink (see Apache Flume integration) as a source. Its properties are defined in configuration files on the Flume server rather than in TQL.
The WebActionSink properties are:
property | value | notes |
---|---|---|
agent.sinks.webactionSink.type |
| |
agent.sinks.webactionSink.serverUri |
| the IP address and port of the Striim server (adjust the port number if you are not using the default) |
agent.sinks.webactionSink.username | the Striim login to be used by the WebActionSink | |
agent.sinks.webactionSink.password | the password for that login | |
agent.sinks.webactionSink.stream |
| specify the stream name to be used in TQL (see example below) |
agent.sinks.webactionSink.parser.handler |
| in this release, only DSVParser is supported |
You must also specify the properties for the specified parser. See the example below.
The following example application assumes that Flume is running on the same system as Striim.
1. Perform the first two steps described in Apache Flume integration.
2. Save the following as a TQL file, then load, deploy, and start it:
CREATE APPLICATION flumeTest; CREATE STREAM flumeStream of Global.WAEvent; CREATE TARGET flumeOut USING SysOut(name:flumeTest) INPUT FROM flumeStream; END APPLICATION flumeTest;
This application does not need a CREATE SOURCE
statement because the data is being collected and parsed by Flume. The stream name must match the one specified in the WebActionSink properties and the type must be Global.WAEvent.
2. Save the following as waflume.conf
in the flume/conf
directory, replacing the two IP addresses with the test system's IP address and the username and password with the credentials you used to load the application:
# Sources, channels and sinks are defined per agent, # in this case called 'agent' agent.sources = netcatSrc agent.channels = memoryChannel agent.sinks = webactionSink # For each one of the sources, the type is defined agent.sources.netcatSrc.type = netcat agent.sources.netcatSrc.bind = 192.168.1.2 agent.sources.netcatSrc.port = 41414 # The channel can be defined as follows. agent.sources.netcatSrc.channels = memoryChannel # Each sink's type must be defined agent.sinks.webactionSink.type = com.webaction.flume.WebActionSink agent.sinks.webactionSink.serverUri = watp://192.168.1.2:9080 agent.sinks.webactionSink.username = flumeusr agent.sinks.webactionSink.password = passwd agent.sinks.webactionSink.stream = flume:flumeStream agent.sinks.webactionSink.parser.handler = DSVParser agent.sinks.webactionSink.parser.blocksize = 256 agent.sinks.webactionSink.parser.columndelimiter = "," agent.sinks.webactionSink.parser.rowdelimiter = "\n" agent.sinks.webactionSink.parser.charset = "UTF-8" agent.sinks.webactionSink.parser.blockAsCompleteRecord = "True" #Specify the channel the sink should use agent.sinks.webactionSink.channel = memoryChannel # Each channel's type is defined. agent.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent.channels.memoryChannel.capacity = 100
3. Start Flume, specifying the configuration file:
bin/flume-ng agent --conf conf --conf-file conf/waflume.conf --name agent - Dflume.root.logger=INFO,console
4. Save the following as flumetestdata.csv
:
100,first 200,second 300,third
5. Open a terminal, change to the directory where you saved flumetestdata.csv
, and enter the following command, replacing the IP address with the test system's:
cat flumetestdata.csv | nc 192.168.1.2 41414
The following output should appear in striim-node.log
:
flumeTest: WAEvent{ data: ["ID","Name"] metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0} before: null dataPresenceBitMap: "AA==" beforePresenceBitMap: "AA==" typeUUID: null }; flumeTest: WAEvent{ data: ["100","first"] metadata: {"RecordStatus":"VALID_RECORD","FileName":"","FileOffset":0} before: null dataPresenceBitMap: "AA==" beforePresenceBitMap: "AA==" typeUUID: null }; ...
See Parsing the data field of WAEvent for more information about this data format.