Salesforce CDC Reader
Note
This adapter is available in Striim Cloud Enterprise and Striim Cloud Mission Critical, and as a trial adapter in Striim Developer.
For an introduction to reading from Salesforce, see Salesforce.
Salesforce offers an event-driven CDC option to its customers to get real-time updates of Salesforce tables. This includes Schema Changes indication.
The Salesforce CDC Reader ingests Salesforce streaming data and emits Striim WAEvents that can be processed with continuous queries or directed to a Striim target. The Striim Salesforce CDC Reader uses the Salesforce CDC (Change Data Capture) APIs to provide near-real-time updates of changes to the underlying data. The Striim Salesforce CDC Reader supports all custom objects and standard objects that follow the Salesforce StandardObjectNameChangeEvent model.
Typical use cases for reading from Salesforce include:
Integration with CDP: Move data from Salesforce CRM in real time to a data warehouse to build a Customer Data Platform
Analytics workflow: Integrate Salesforce CRM data with ML/analytics systems for workflows like Next best action/Offer, LTV analysis and churn analysis
Striim offers several adapters to read from Salesforce. For more information on how to use the different Salesforce adapters for different use cases, see Salesforce Readers.
You can read from Salesforce as follows, and write to any target supported by Striim.
For initial load, you can use the Salesforce Reader.
For near real-time continuous replication after initial load, you can use Salesforce CDC Reader that reads new source data as soon as a change event is emitted from Salesforce.
If you are already using Salesforce Reader in several Striim apps for Incremental Load , and are looking to make data transfer more real-time, see "Updating existing apps to use Salesforce CDC".
Summary
APIs used/data supported | Salesforce CDC (Streaming API) using CometD. See the Salesforce documentation to learn more. |
Data supported |
|
Supported targets |
|
Security and authentication | OAuth based authentication |
Operations supported | Incremental load |
Schema management | The Salesforce CDC Reader does not support initial schema creation at the target. Use the Salesforce Reader to create the initial schema. The Salesforce CDC Reader supports schema evolution for selected schema changes. |
Resilience / recovery | Configurable automatic retries
|
Performance | Striim Platform supported parallel execution |
Programmability |
|
Metrics and auditing | Key metrics available through Striim monitoring |
Key limitations |
|
Salesforce CDC Reader initial setup
Prerequisites
Before you can start using Salesforce CDC Reader in Striim applications, review and complete the following pre-requisites:
Determine the sObjects and custom objects that you want Salesforce CDC Reader to read from. Refer to Salesforce documentation to check if the selected sObject(s) support CDC).
Verify that your Salesforce agreement provides you the required entitlement for the number of objects you want to use Salesforce CDC to read from. (See the related Salesforce documentation).
Verify also that your API quotas will be sufficient for the expected event rate for the objects you are enabling CDC for. (See the related Salesforce documentation).
Configuring Salesforce
You must have appropriate permissions on the Salesforce dashboard to complete these configurations.
Configuring Salesforce CDC channel
Striim Salesforce CDC reader can read from one channel in one Striim application. Each channel can include events for multiple objects (including a combination of sObjects and Custom objects). You can design the channel coverage depending on your needs including any governance requirements. Once you have finalized the channel-object mapping, configure the required channels in the Salesforce dashboard.
Configuring OAuth for Salesforce CDC Reader
You can configure Salesforce CDC Reader for OAuth authentication. You will first generate an authentication token using the Salesforce OAuth service, then set the value of that authentication token in the Salesforce CDC Reader in Striim. You can configure OAuth to use a JWT Bearer token, by configuring the Salesforce instance, and configuring the Striim app's access.
The following steps configure OAuth with an authentication token.
Generate an authentication token using the following command:
curl https://login.salesforce.com/services/oauth2/token -d "grant_type=password"\ -d "client_id=<your consumer key>"\ -d "client_secret=<your consumer secret>"\ -d "username=<your username>"\ -d "password=<your password>"
Note the value of the authentication token.
In the Salesforce CDC Reader, set the value of the authentication token.
The following procedures configure OAuth to use a JWT Bearer token. The first procedure configures the Salesforce instance.
Log in to the Salesforce instance.
In Security Controls > Certificate and Key Management, click Create self-signed certificate.
Download the certificate to your local storage.
Click Export to Keystore and enter a password.
The certificate downloads to your local storage in the Java Keystore (JKS) format. Note the password for later use.
Click Create > Apps and create a new connected app that uses the downloaded sefl-signed certificate.
See Salesforce documentation for more information about creating connected apps. The app must have the
refresh_token
andoffline_access
scopes.Enter the authorization URL
https://login.salesforce.com/services/oauth2/authorize
with the following query string parameters into your web browser's address bar, press Enter, and accept the access authorization request.client_id: the consumer key of the connected app.
redirect_uri:
https://login.salesforce.com/services/oauth2/success
response_type:
code
The second procedure configures the Striim app for JWT-mediated access.
In the Flow Designer, set the value of Auto Auth Token Renewal to
true
and OAuth Authorization Flows toJWT_BEARER
.Type the values for the Salesforce account username and consumer key.
Type the path to the JKS-format key store exported in the previous procedure and provide the password used at time of export.
Type the name to use as a unique alias for the JWT Certificate.
Add any other required properties for the Striim app and start the app.
The Salesforce CDC Reader uses a JWT certificate to authenticate to Salesforce.
Create a Salesforce CDC Reader application
To start using the Salesforce CDC Reader in Striim apps, you can create an application in the following ways:
Create a Salesforce CDC Reader application using a wizard
In Striim, select Apps > Create New, enter Source: Salesforce CDC in the search bar, click the kind of application you want to create, and follow the prompts. See Creating apps using wizards for more information.
Create a Salesforce CDC Reader application using TQL
CREATE OR REPLACE APPLICATION SalesforceCDC; CREATE SOURCE SalesForceCDC USING Global.SalesforceCDCReader ( autoAuthTokenRenewal: false, Password_encrypted: 'true', securityToken_encrypted: 'true', Username: '', JWTKeystorePath: '', fetchSize: '1000', apiEndPoint: 'https://***.salesforce.com', authToken: '*****', JWTKeystorePassword_encrypted: 'true', consumerSecret: '', OAuthAuthorizationFlows: 'PASSWORD', ThreadPoolCount: '0', authToken_encrypted: 'true', securityToken: '', Password: '', JWTCertificateName: '', CDDLAction: 'Process', startPosition: 'LATEST', connectionProfileName: '', CDDLCapture: true, useConnectionProfile: false, consumerKey: '', startTimeStamp: '23456789', connectionRetryPolicy: 'retryInterval=30, maxRetries=3', consumerSecret_encrypted: 'true', sObjects: 'Employee__c', JWTKeystorePassword: '' ) OUTPUT TO mystream; END APPLICATION SalesforceCDC;
Create a Salesforce CDC Reader application using the Flow Designer
Salesforce CDC Reader programmer's reference
Salesforce CDC Reader properties
Name | Type | Default | Description |
---|---|---|---|
API endpoint | Text | The endpoint URL of the customer account in Salesforce. | |
Auth Token | Password | The access token. | |
Auto Auth Token Renewal | Toggle | false | When set to true, the adapter automatically renews the access token, enabling the application to run continuously. |
CDDL action | Select list | Process | Handles schema evolution. |
CDDL capture | Boolean | false | Specifies whether to capture changes in the schema definitions of source objects. |
Connection retry policy | Text | retryinterval=30, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Consumer | Text | Consumer. | |
Consumer Key | Text | Consumer key of the connected app in Salesforce. | |
Custom channel | Text | A user-specified custom channel to which the adapter subscribes. When a custom channel is provided, the adapter subscribe only to the custom channel and does not read from any standard channel. The adapter reads from all sObjects that are subscribed to the custom channel and listed in the sObjects property. When the specified sObjects are not valid for the custom channel, the app halts. | |
Exclude Objects | Text | When objects are specified with a wildcard, objects in this property are excluded from sync. The Exclude Objects property does not support wildcards. Specify objects as a semicolon ( | |
Fetch size | Text | 1000 | Specifies the maximum number of events to fetch from the event bus in a single operation. Larger numbers of events are transmitted as a Striim event. |
JWT Certificate name | Text | Alias name for the certificate. | |
JWT Keystore password | Password | Password to the JWT keystore. | |
JWT Keystore path | Text (Choose a file) | Path to the JWT keystore. | |
OAuth Authorization Flows | Select List
| PASSWORD | Select an authorization flow option from the list. |
Password | Text | Password to the account on the Salesforce instance. | |
ReplayID | Long | A user-provided replayID. | |
Secret | Secret of the connected app. | ||
Security token | Password | Security token for the user. | |
sObjects | Text | Objects whose CDC events adapters subscribe to. Supports wildcards. The Do not modify this property when CDDL Capture is True or recovery is enabled for the application. Use the Salesforce metadata API call to determine which objects support CDC events. | |
Start position | Select list:
| Latest | Select Earliest for historical data within the channel's retention window. Select Latest for the most recent data from app start time. Select replayID to replay from the specified replayID. |
Start time stamp | Long | When this property is not specified, only new data is read. Optionally, specify the timestamp from which the adapter will read. It accepts any value greater than 0 in epoch time. Even though negative values represent dates before 1970, we currently support only positive values. For example:
| |
Thread pool count | Integer | 0 | Configure the size of the thread pool to optimize performance for multiple objects. |
Username | Text | Username of an account on the Salesforce instance. |
Salesforce CDC Reader WAEvent structure
WAEvent{ data: ["a0F5j00000YO85TEAT","Debargha",300.0,"2022-07-16T14:04:27.000Z", "data1_api"] metadata: {"LastModifiedDate":1690362076000,"sequenceNumber":1,"CustomObject":true, "OperationName":"Insert","changeType":"CREATE","changedFields":[], "commitNumber":669201256067,"commitUser":"0055j000004c0kjAAA", "TableName":"CDC2__c","OwnerId":"0055j000004c0kjAAA", "CreatedById":"0055j000004c0kjAAA","entityName":"CDC2__c", "schemaId":"3C3stds59aFv7UErTa9p6w","CreatedDate":1690362076000, "changeOrigin":"com/salesforce/api/soap/58.0;client=Workbench/", "transactionKey":"0002071e-9f78-0f80-5bdf-15390e14a364", "replayId":32001271,"LastModifiedById":"0055j000004c0kjAAA", "commitTimestamp":1690362076000, "recordIds":["a0F5j00000YO85TEAT"]} userdata: null before: null dataPresenceBitMap: "Hw==" beforePresenceBitMap: "AA==" typeUUID: {"uuidstring":"01ee2b83-ef64-92c1-a550-e67c0573b1bb"} };
Salesforce CDC Reader data type support and correspondence
Striim type | Salesforce data type |
---|---|
java.lang.Object | base64 |
String | boolean |
Byte | byte |
org.joda.time.LocalDate | date |
org.joda.time.DateTime | dateTime |
Double | double |
Long | int |
String | string |
String | time |
Salesforce CDC Reader runtime considerations
Metrics monitored by the Striim Salesforce CDC Reader
The Striim Salesforce CDC Reader monitors the following metrics.
Metric | Description |
---|---|
Read timestamp | Timestamp of the most recent record read by the adapter. |
Number of deletes | Number of delete operations |
Number of inserts | Number of insert operations |
Number of updates | Number of update operations |
Max CPU rate | Maximum CPU rate across all nodes |
Max CPU rate per node | Maximum CPU usage per node |
Server count | Number of nodes in the Striim cluster |
Number of events | Number of events seen by the Striim platform |
Source input | |
Source rate | |
Input | |
Input rate | |
Rate | |
Last event position | Timestamp and ID of the most recent event |
Last observed timestamp | The latest commit timestamp. The last observed timestamp value will be "Not Ready" if the app is not started. |
Latest activity | |
Read lag | |
Table name | Name of the table |
Total rows | Total number of rows handled by the table |
Rows read | Total number of rows read by the source |
Number of subscribed objects | Total number of objects that are subscribed to receive events |
Number of events delivered | Total number of events delivered by cometD clients |
Number of concurrent CometD subscribers | Number of concurrent cometD subscribers |