Skip to main content

CREATE EVENTTABLE

CREATE EVENTTABLE <name>
USING STREAM ( NAME: '<stream name>' ) 
[ DELETE USING STREAM ( '<stream name>') ]
QUERY ( 
  keytomap:'<key field for type name>'
  [, persistPolicy:'True' ]
)
OF <type name>;

An event table is similar to a cache, except it is populated by an input stream instead of by an external file or database. CQs can both INSERT INTO and SELECT FROM an event table.

Event tables retain only the most recent event for each value of the key field defined by keytomap. In other words, the key field is similar to a database table's primary key. The first event received for each key value adds an event to the table, and subsequent events for that key value update it.

Optionally, an event table may have a second input stream, defined by DELETE USING STREAM. Events inserted into this stream will delete the event with the corresponding key field value. The other values in the event are ignored, though they must be valid as regards the delete stream's type.

If persistpolicy is True, events will be persisted to Elasticsearch (while still retained in memory) and retained (including through terminations and restarts) until the application is dropped. If persistpolicy is False, event table data will be lost when the application is undeployed or terminated.

Inserting events into an event table

Use a CQ to insert events into an event table.

To demonstrate this, save the following as striim/Samples/EventTable1.csv:

id,color,value
1,green,10
2,blue,20

Then create EventTable1 in namespace ns1 and run it:

CREATE NAMESPACE ns1;
USE NS1;
CREATE APPLICATION EventTable1;

CREATE SOURCE EventTableSource USING FileReader  ( 
  Wildcard: 'eventtable1.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableSource_Stream ;

CREATE CQ EventTableSource_Stream_CQ 
INSERT INTO EventTableSource_TransformedStream
SELECT TO_INT(data[0]) as id,
  TO_STRING(data[1]) as color,
  TO_INT(data[2]) as value
FROM EventTableSource_Stream;

CREATE EVENTTABLE EventTableDemo USING STREAM ( 
  name: 'EventTableSource_TransformedStream'
) 
DELETE USING STREAM ( 
  name: 'ETTestDeleteStream'
) 
QUERY ( 
  keytomap: 'id'
) 
OF EventTableSource_TransformedStream_Type;

END APPLICATION EventTable1;
DEPLOY APPLICATION EventTable1;
START APPLICATION EventTable1;

Once the application has started, query the event table, and you will see that the contents match EventTable1.csv:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 1
   color = green
   value = 10
]
[
   id = 2
   color = blue
   value = 20
]

-> SUCCESS 
Updating an event table

When an event table receives a new event with the same key field value as an existing event in the table, it updates its values.

To demonstrate this, save the following as striim/Samples/EventTable2.csv:

id,color,value
1,purple,25

Then create EventTable2, run it, and query the event table again. This application does not need a CQ since it uses the one from EventTable1.

CREATE APPLICATION EventTable2;

CREATE SOURCE EventTableSource2 USING FileReader  ( 
  Wildcard: 'eventtable2.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableSource_Stream;

END APPLICATION EventTable2;
DEPLOY APPLICATION EventTable2;
START APPLICATION EventTable2;

select * from EventTableDemo;

The event with id 1 is updated to match the data in EventTable2.csv:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 1
   color = purple
   value = 25
]
[
   id = 2
   color = blue
   value = 20
]
Deleting events from an event table using the delete stream

To delete an event, send an event with the same key field value to the event table's delete stream.

To demonstrate this, you can reuse EventTable2.csv. Create EventTable3, run it, and query the event table again:

STOP APPLICATION EventTable2;
CREATE APPLICATION EventTable3;

CREATE SOURCE EventTableDeleteSource USING FileReader  ( 
  Wildcard: 'eventtable2.csv',
  Directory: 'Samples',
  PositionByEof: false
) 
PARSE USING DSVParser  ( 
  header: true
) 
OUTPUT TO EventTableDelete_Stream ;

CREATE CQ EventTableDelete_Stream_CQ 
INSERT INTO ETTestDeleteStream
SELECT TO_INT(data[0]) as id,
  TO_STRING(data[1]) as color,
  TO_INT(data[2]) as value
FROM EventTableDelete_Stream;

END APPLICATION EventTable3;
DEPLOY APPLICATION EventTable3;
START APPLICATION EventTable3;

The event with id 1 was deleted from the event table:

W (ns1) > select * from EventTableDemo;
Processing - select * from EventTableDemo
[
   id = 2
   color = blue
   value = 20
]
Deleting events from an event table using convertToDeleteEvent()

When deleting events using the delete stream, there could be a race condition between the input stream and the delete stream when they both receive an event with the same key at almost the same time. To avoid that, use convertToDeleteEvent(), as follows:

CREATE CQ <name>
INSERT INTO <event table input stream>
SELECT et.convertToDeleteEvent()
FROM <event table> et
WHERE <criteria selecting events to delete>;

The alias (et) is required by convertToDeleteEvent(). You may use any alias you wish.

For example, to delete the remaining event from the ETTestDeleteStream event table:

STOP APPLICATION EventTable3;
CREATE APPLICATION EventTable4;

CREATE CQ EventTableDelete_Stream_CQ 
INSERT INTO EventTableSource_Stream
SELECT et.convertToDeleteEvent()
FROM EventTableDemo et
WHERE id=2;

END APPLICATION EventTable4;
DEPLOY APPLICATION EventTable4;
START APPLICATION EventTable4;