Declarative, Fully Managed Data Streaming Pipelines

Table of Contents

Data pipelines can be tricky business —failed jobs, re-syncs, out-of-memory errors, complex jar dependencies, making them not only messy but often disastrously unreliable. Data teams want to innovate and do awesome things for their company, but they get pulled back into firefighting and negotiating technical debt. 

The Power of Declarative Streaming Pipelines

Declarative pipelines allow developers to specify what they want to achieve in concise, expressive code. This approach simplifies the creation, management, and maintenance of data pipelines. With Striim, users can leverage SQL-based configurations to define source connectors, target endpoints, and processing logic, making the entire process intuitive and accessible, all while delivering highly consistent, real-time data applied as merges and append-only change-records.

How Striim pipelines work

Striim pipelines streamline the process of data integration and real-time analytics. A pipeline starts with defining a source, which could be a database, log, or other data stream. Striim supports advanced recovery semantics such as A1P (At Least Once Processing) or E1P (Exactly Once Processing) to ensure data reliability.

You can see the full list of connectors stream supports here. →

The data flows into an output stream, which can be configured to reside in-memory for low-latency operations or be Kafka-based for distributed processing. Continuous queries on these materialized streams allow real-time insights and actions. Windowing functions enable efficient data aggregation over specific timeframes. As the data is processed, it is materialized into downstream targets such as databases or data lakes. Striim ensures data accuracy by performing merges with updates and deletes, maintaining a true and consistent view of the data across all targets.

Here we’ll look at an application that reads data from Stripe, replicates data to BigQuery in real-time, then while the data is in flight, detect declined transactions and send slack alerts in real-time. 

Striim’s Stripe Analytics Pipeline

Let’s dive into a practical example showcasing the power and simplicity of Striim’s streaming pipelines. The following is a Stripe analytics application that reads data from Stripe, processes it for fraudulent transactions, and generates alerts.

Application Setup

The first step is to create an application that manages the data streaming process. The `StripeAnalytics` application is designed to handle Stripe data, process fraudulent transactions, and generate alerts. 

This statement initializes the `StripeAnalytics` application with an automatic recovery interval of 5 seconds, ensuring resilience and reliability. You can see we define ‘Recovery 5 second interval’. This handles the checkpointing for at least once or exactly once deliver from source to target with transactional support.

Reading from Stripe

Next, we define the source to read data from Stripe. The `StripeReaderSource` reads data at 10-second intervals and outputs it to a stream called `StripeStream`. The ‘automated’ mode denotes that schemas will be propagated to downstream targets (data warehouses, databases), an initial load of historical data will load, before starting the live CDC. 

				
					CREATE OR REPLACE APPLICATION StripeAnalytics RECOVERY 5 SECOND INTERVAL;
-- automatic recovery for A1P delivery to Slack alerts

				
			
				
					CREATE OR REPLACE SOURCE StripeReaderSource USING Global.StripeReader (interval:10s, mode:automated)
OUTPUT TO StripeStream PERSIST using ConfluentKafkaProperties;
-- Connect to Stripe API, pull all relevant objects on a 10 second refrequency
-- 'automated' mode for schema creation, initial load, and CDC
-- optionally, all streams can be persisted to your Kafka cluster for recovery

				
			
Striim streams are in-memory by default, but can be backed up to our managed Kafka or to your external Confluent Kafka cluster. 

Writing to BigQuery

The processed data is then written to BigQuery for storage and analysis. The `BigQueryWriter` takes data from the `StripeStream` and loads it into BigQuery, automatically creating the necessary schemas.
				
					CREATE OR REPLACE TARGET BigQueryWriter USING Global.BigQueryWriter ()
INPUT FROM StripeStream;
-- ELT all Stripe data into BigQuery, automatically creates schemas

				
			

Fraud Detection

To detect fraudulent transactions, we use a jumping window to keep the last two rows for each `customer_id`. This window is defined over a stream called `FraudulentStream`.

				
					CREATE OR REPLACE JUMPING WINDOW FraudulentWindow OVER FraudulentStream
KEEP 2 ROWS PARTITION BY customer_id;

				
			

A continuous query (`FetchFraudulentTransactions`) is then created to pull declined transactions from the `StripeStream` and insert them into the `FraudulentStream`.

				
					CREATE OR REPLACE CQ FetchFraudulentTransactions
INSERT INTO FraudulentStream
SELECT TO_STRING(GETDATA(e, "CUSTOMERID")) as customer_id,
TO_STRING(GETDATA(e, "FAILURECODE")) as failure_code
FROM StripeStream s
where META(s, "TableName").equals("Charges") and
GETDATA(s, "FAILURECODE") is not null and
TO_STRING(GETDATA(s, "FAILURECODE")).contains("card_declined");

				
			

Generating Alerts

To notify the relevant parties about fraudulent transactions, we generate alert events. The `GenerateAlertEvents` query groups the declined transactions by `customer_id` and inserts them into the `FraudulentAlertStream`.

				
					CREATE OR REPLACE CQ GenerateAlertEvents
INSERT INTO FraudulentAlertStream
SELECT 'Declined Transaction by CustomerID ' + customer_id,  customer_id + '_' + DNOW(), 'warning', 'raise',
' Declined transaction attempted by customer with ID : ' + customer_id
FROM FraudulentWindow
GROUP BY customer_id;

				
			

Sending Alerts to Slack

Finally, we create a subscription to send these alerts to a Slack channel. The `FraudulentWebAlertSubscription` uses the `SlackAlertAdapter` to deliver alerts to the specified channel.

				
					CREATE OR REPLACE SUBSCRIPTION FraudulentWebAlertSubscription USING SlackAlertAdapter (
  channelName: 'john_936_21813_FraudulentWebAlertSubscription',
  isSubscription: 'true' )
INPUT FROM FraudulentAlertStream;

				
			

Completing the Application

We conclude the application definition with the `END APPLICATION` statement.

				
					END APPLICATION StripeAnalytics;

				
			

Striim’s declarative approach offers several benefits:

  1. Simplicity: SQL-based streaming pipelines make it easy to define rich processing logic.
  2. Scalability: Striim handles large volumes of data efficiently by scaling up and horizontally as a fully managed service, ensuring real-time processing and delivery.
  3. Flexibility: The ability to integrate with various data sources and targets provides unparalleled flexibility. 
  4. Reliability: Built-in recovery mechanisms ensure data integrity and continuous operation.
  5. Consistency: Striim delivers consistent data across all targets, maintaining accuracy through precise merges, updates, and deletes.
  6. Portability: Striim can be deployed in our fully managed service, or run in your own cloud a multi-node cluster containerized with kubernetes. 

Monitoring and Reliability

Are your reports stale? Is data missing or inconsistent? Striim allows you to drill down into these metrics and gives you out-of-the-box slack alerts so your pipelines are always running on autopilot.

 
 

Conclusion

Striim’s declarative, fully managed data streaming pipelines empower your data team to harness the power of real-time data. By simplifying the process of creating and managing data pipelines, Striim enables organizations to focus on deriving insights and driving value from their data. The Stripe analytics application is a prime example of how Striim can streamline data processing, replication and alerting on anomalies, making it an invaluable tool for modern data-driven enterprises. You can try Striim for free. No credit card, no sales call, just 14 days of fast data striiming.