Skip to main content

Building pipelines with GCS Reader

You can read from Google Cloud Storage using the GCS Reader and write to any target supported by Striim. Typically, you will set up pipelines in two phases—initial load, followed by continuous incremental replication—as explained in Pipelines.

  • For initial load, use GCS Reader in Initial Load mode to create a point-in-time copy of the selected objects (files) at the target.

  • After initial load has completed, start continuous replication by reading new or changed objects created or modified after the initial load began, then writing those changes to the target. For near-real-time continuous replication of new source data, use the GCS Reader in Incremental mode and set an appropriate polling interval.

You can use automated pipeline wizards to build GCS pipelines that Striim manages end-to-end, or you can manually create separate applications for initial load and continuous incremental replication and manage the lifecycle yourself. Since the GCS Reader supports both initial load and continuous replication, you can handle both with a single application.

Using an automated pipeline wizard: if you want to build near-real-time pipelines from Google Cloud Storage and write to a supported target, we recommend using an automated pipeline wizard with a GCS Reader source. These wizards perform the following steps automatically:

  • Create two applications: one for initial load and the other for continuous incremental replication (polling).

  • Create a schema and tables in the target that match the data to be synced from the bucket (based on your parser configuration and mapping).

  • Run the initial-load application to copy existing data from the bucket to the target.

  • When initial load completes, run the incremental application to replicate new or changed objects using the configured object detection mode and polling interval.

Not using an automated pipeline wizard: if your use case or policies do not allow using an automated pipeline, create separate applications for initial load and continuous replication:

  • Before performing initial load, identify a stable, monotonically increasing watermark for change detection—typically the object’s last-modified time (for GCSDirectoryListing) or the event time from Audit Logs (for GCSAuditLogNotification).

  • Create a schema and tables in the target and perform initial load: use a wizard with a GCS Reader source. (Alternatively, you may pre-create target tables using native or third-party utilities.)

  • Perform an initial load when the schema and tables already exist in the target: use a wizard with a GCS Reader source configured for full export.

  • Switch from initial load to continuous replication: provide the last successful watermark (for example, last processed modified timestamp) as the starting offset by setting Start Timestamp if needed.

  • Replicate new data: use a wizard with a GCS Reader source configured for incremental polling (set the object detection mode and polling interval). Configure the target for upsert semantics using appropriate key columns.

Alternatively, instead of using wizards, you can create applications using Flow Designer, TQL, or Striim’s REST API.

Pre-requisite — The initial setup and configuration you do in Google Cloud are described in the Initial setup section.

Create a pipeline using Flow Designer

  1. In the Striim UI, create a new pipeline and add a Source.

  2. Select Cloud storage > GCS Reader.

  3. Specify connection properties:

    • Project Id — Google Cloud project for the bucket.

    • Bucket Name — bucket to read from.

    • Service Account Key — path to the JSON credentials file (or rely on $GOOGLE_APPLICATION_CREDENTIALS).

  4. Scope the objects to read (optional):

    • Folder Name and Include Subfolders.

    • Object Filter (for example, *.json).

  5. Choose how to detect changes:

    • Object Detection Mode: GCSDirectoryListing (default) or GCSAuditLogNotification.

    • Set Polling Interval, and optionally a Start Timestamp for incremental start.

  6. Select how to fetch and parse files:

    • Use Streaming (recommended). Disable to download files first (required for Parquet), and adjust Download Policy as needed.

    • Choose a parser (for example, JSON, DSV, Avro, Parquet) and configure parser options.

  7. Add a target (for example, BigQuery, Snowflake, or a database/file writer) and complete any field mapping.

  8. Validate, save, and run the pipeline. Monitor source and target components to confirm expected throughput and events.

Note

When using GCSAuditLogNotification, ensure audit logs are enabled and the service account has the required logging permissions.

Create a pipeline using TQL

In TQL, the adapter is referenced as Global.GCSReader and parsers are referenced from the Global namespace (for example, Global.JSONParser).

CREATE OR REPLACE APPLICATION GCS_Json;

CREATE SOURCE GCSReader_Json USING Global.GCSReader (
  ProjectId: '<example-project>',
  BucketName: '<example-bucket>',
  ServiceAccountKey: '/user/striim/accesskey/GCSuser_access_Key.json',
  ObjectDetectionMode: 'GCSDirectoryListing',
  ObjectFilter: '*.json',
  ProjectId: '<example-project>',
  BucketName: '<example-bucket>',
)
PARSE USING Global.JSONParser ()
OUTPUT TO GCSOutput;

CREATE TARGET SysOut USING Global.SysOut ( name: 'sysout' )
INPUT FROM GCSOutput;

END APPLICATION GCS_Json;

Note

For Parquet files, set UseStreaming: false and configure DownloadPolicy to ensure the entire file is available before parsing.