Skip to main content

Google PubSub Writer

Writes to an existing topic in Google Cloud Pub/Sub.

Only async mode is supported. Consequently, events may be written out of order.

Google PubSub Writer properties

property

type

default value

notes

Batch Policy

String

EventCount:1000, Interval:1m, Size:1000000

Cached data is written to the target every time one of the specified values is exceeded. With the default value, data will be written once a minute or sooner if the buffer contains 1000 events or 1,000,000 bytes of data. When the application is stopped any remaining data in the buffer is discarded.

Due to google-cloud-java issue #4757, the maximum supported value for EventCount is 1000.

The MaxOutstandingRequestBytes value in PubSub Config must be equal to or higher than the Batch Policy size.;

Message Attributes

String

The Message Attributes property allows you to specify one or more key-value pairs sent as part of the PubSub message attributes to filter by subscriber. Setting this property produces messages with the Message Attributes for each event.

You can specify the value as a static string or a dynamic value from the incoming stream. 

For a WAEvent stream, you can extract the value from metadata or user data, while for a Typed stream, you can use any of the fields of the typed stream.

Note: The keys do not support special characters. They can be only alphanumeric.

Examples of static and dynamic values:

Static value:

CName="Striim"

Dynamic value:

Table=@metadata(TableName)

See Message Attributes and Ordering Key sample and client configuration.

Ordering Key

String

The Ordering Key property takes a single-string value known as a key, which is used to deliver messages to subscribers in the order in which the Pub/Sub system receives them. Setting this property produces messages with an OrderingKey for each event.

You can specify the value as a static string or a dynamic value from the incoming stream. The Ordering Key only supports a dynamic value event lookup for @metadata and @userdata.

For a WAEvent stream, you can extract the value from metadata or user data, while for a Typed stream, you can use any of the fields of the typed stream.

Note: The keys do not support special characters. They can be only alphanumeric.

Examples of static and dynamic values:

Static value:

OrderingKey: "Test1"

Dynamic value:

OrderingKey : @metadata(TableName)

See Message Attributes and Ordering Key sample and client configuration.

Project ID

String

the project to which the PubSub instance belongs

PubSub Config

String

RetryDelay: 1, MaxRetryDelay:60, TotalTimeout:600, InitialRpcTimeout:10, MaxRpcTimeout:600, RetryDelayMultiplier:2.0, NumThreads:10, MaxOutstandingElementCount:1000, MaxOutstandingRequestBytes:1000000

Do not change these values except as instructed by Striim support.

Service Account Key

String

The path (from root or the Striim program directory) and file name to the .json credentials file downloaded from Google (see Service Accounts). This file must be copied to the same location on each Striim server that will run this adapter, or to a network location accessible by all servers.

If a value for this property is not specified, Striim will use the $GOOGLE_APPLICATION_CREDENTIALS environment variable.

The service account must have the PubSub Publisher or higher role for the topic (see Access Control).

Topic

String

the topic to publish to

This adapter has a choice of formatters. See Supported writer-formatter combinations for more information.Supported writer-formatter combinations

Google PubSub Writer sample application

CREATE APPLICATION GooglePubSubWriterTest;

CREATE SOURCE PosSource USING FileReader (
  wildcard: 'PosDataPreview.csv',
  directory: 'Samples/PosApp/appData',
  positionByEOF:false )
PARSE USING DSVParser (
  header:Yes,
  trimquote:false )
OUTPUT TO PosSource_Stream;

CREATE CQ PosSource_Stream_CQ
INSERT INTO PosSource_TransformedStream
SELECT TO_STRING(data[1]) AS MerchantId,
  TO_DATE(data[4]) AS DateTime,
  TO_DOUBLE(data[7]) AS AuthAmount,
  TO_STRING(data[9]) AS Zip
FROM PosSource_Stream;

CREATE TARGET GooglePubSubTarget USING GooglePubSubWriter (
  ServiceAccountKey:'my-pubsub-cb179721c223.json',
  ProjectId:'my-pubsub',
  Topic:'mytopic'
)
FORMAT USING JSONFormatter ()
INPUT FROM PosSource_TransformedStream;

END APPLICATION GooglePubSubWriterTest;

Message Attributes and Ordering Key sample and client configuration

This sample adds the primary key value of the source table in Message Attributes and OrderingKey:

CREATE OR REPLACE TARGET pubsubtest USING Global.GooglePubSubWriter ( 
  BatchPolicy: 'EventCount:1000,Interval:1m,Size:1000000',   
  MessageAttributes: 'CompanyName=\"Example.com Inc.\"',   
  Topic: 'OrderingKeyTest2', 
  ServiceAccountKey: '/Users/example/Documents/striimdev-612345678a5b.json', 
  ProjectId: 'striimdev',   
  adapterName: 'GooglePubSubWriter',   
  OrderingKey: 'Test1', 
  PubSubConfig: 'RetryDelay:1,MaxRetryDelay:60,TotalTimeout:600,
    InitialRpcTimeout:10,MaxRpcTimeout:10,RetryDelayMultiplier:2.0,
    RpcTimeoutMultiplier:1.0,NumThreads:10,MaxOutstandingElementCount:1000,
    MaxOutstandingRequestBytes:1000000' ) 

FORMAT USING Global.JSONFormatter  (   
  handler: 'com.webaction.proc.JSONFormatter',   
  jsonMemberDelimiter: '\n',   
  EventsAsArrayOfJsonObjects: 'true',   
  formatterName: 'JSONFormatter',   
  jsonobjectdelimiter: '\n' ) INPUT FROM DSVOP;

END APPLICATION FileReadertopubsub;

You can set the message ordering property when you create a subscription using the Google Cloud console, the Google Cloud CLI, or the Pub/Sub API (see Ordering messages). For example:

Subscription subscription =          
  subscriptionAdminClient.createSubscription(              
    Subscription.newBuilder()                  
      .setName(subscriptionName.toString())                  
      .setTopic(topicName.toString())   
  // Set message ordering to true for ordered messages in the subscription.                  
      .setEnableMessageOrdering(true)                  
      .build());