Databricks Writer
Databricks Writer writes to Delta Lake tables in Databricks on AWS or Azure. Delta Lake is an open-source tabular storage framework that includes a transaction log to support features such as ACID transactions and optimistic concurrency control typically associated with relational databases.
For more information, see:
What is Delta Lake? for AWS or What is Delta Lake? for Azure.
Databricks on AWS and Databricks documentation for Amazon Web Services on databricks.com and Databricks on AWS on aws.amazon.com
Azure Databricks on databricks.com and Azure Databricks and Azure Databricks documentation on microsoft.com
The required JDBC driver is bundled with Striim.
Limitations
Writing to Databricks requires a staging area. The native Databricks File System (DBFS) has as a 2 GB cap on storage, which can cause file corruption. To work around that limitation, we strongly recommend using an external stage instead: Azure Data Lake Storage (ADLS) Gen2 for Azure Databricks or Amazon S3 for Databricks on AWS. To use an external stage, your Databricks instance must use Databricks Runtime 10.4 or later.
If you will use MERGE mode, we strongly recommend partitioning your target tables as this will significantly improve performance (see Partitions | Databricks on AWS or Learn / Azure / Azure Databricks / Partitions.
Data is written in batch mode. Streaming mode is not supported in this release because it is not supported by Databricks Connect (see Databricks Connect - Limitations).
Creating a Databricks target using a template
Note
In this release, Auto Schema Creation is not supported when you are using Databricks' Unity Catalog.
When you create a Databricks Writer target using a wizard template (see Creating apps using templates), you must specify three properties: Connection URL, Hostname, and Personal Access Token. The Tables property value will be set based on your selections in the wizard.
Databricks does not have schemas. When the source database uses schemas, the tables will be mapped as <source_database>.<source_schema>.<table>,<target_database>.<table>
, for example, mydb.myschema.%,mydb.%
. Each schema in the source will be mapped to a database in the target. If the databases do not exist in the target, Striim will create them.
Databricks authentication mechanisms
Databricks authentication requires a personal access token. Optionally you may use Azure Active Directory (Azure AD).
Create a personal access token in Databricks on AWS
Create a personal access token for Striim to use to authenticate with the Databricks cluster (see Documentation > Developer tools and guidance > Authentication for Databricks automation > Databricks personal access token authentication > Databricks personal access tokens for users.
Grant the user associated with the token view and create permissions on the Databricks File System (see Documentation > Security and compliance guide > Authentication and access control > Access control > Workspace object access control > Folder permissions).
If table access control has been enabled, also grant the user MODIFY and READ_METADATA privileges (see Documentation > Databricks reference documentation > Language-specific introductions to Databricks > SQL language reference > Privileges and securable objects in the Hive metastore > Privilege types).
Create a personal access token in Azure Databricks
Create a personal access token for Striim to use to authenticate with the Databricks cluster as described in Learn / Azure / Azure Databricks / Authentication for Azure Databricks automation / Generate a personal access token.
Grant the user associated with the token read and write access to DBFS (see Important information about DBFS permissions).
If table access control has been enabled, also grant the user MODIFY and READ_METADATA privileges (see Data object privileges - Data governance model).
Authenticating to Databricks with Azure Active Directory (Azure AD)
In summary, configuring Azure AD requires the following steps:
Register the Striim app with the Azure AD identity provider (IdP).
Note the registered app's Client ID, Client Secret, and Tenant ID
Make a request to the
/authorize
endpoint using the Postman app or the browser.Authenticate to Azure AD.
Consent to login at the consent dialog box to obtain the authorization code.
Provide the authorization code and Client Secret to the /token endpoint to obtain the access and refresh tokens.
In detail:
Log in to the Azure Portal.
Register a new app.
Note the Application ID (referred to as Client ID in this procedure), the OAuth v2 authorization endpoint, and the OAuth v2 token endpoint.
Generate a new Client secret.
Note the Client Secret for future use.
Add the
AzureDatabricks
API permission.(When the external stage is ADLS Gen 2) Add the
Azure Storage
API permission.
The following procedure uses curl
and the Web browser to fetch the refresh token.
Open the following URL in a Web browser.
https://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/authorize? client_id=<client-id>& response_type=code& redirect_uri=http%3A%2F%2Flocalhost%3A7734%2Fstriim-callback& response_mode=query& scope=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d%2F.default%20offline_access
Replace <tenant-id> with with the tenant ID of the registered app. Replace <client-id> with the client ID of the registered app. Provide valid authentication credentials if Azure Portal requests authentication.
The web browser redirects to the specified redirect URI. The authorization code is the part of the URI after the
code=
string.Note the authorization code for future use.
Execute the following
curl
command.curl -X POST -H 'Content-Type: application/x-www-form-urlencoded' \ https://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/token \ -d 'client_id=<client-id>' \-d 'client_secret=<client_secret>' \ -d 'scope=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d%2F.default%20offline_access' \ -d 'code=<authorization_code>' \ -d 'redirect_uri=http%3A%2F%2Flocalhost%3A7734%2Fstriim-callback' \ -d 'grant_type=authorization_code'
Replace <tenant-id> with with the tenant ID of the registered app. Replace <client-id> with the client ID of the registered app. Replace <client_secret> with the client secret of the registered app. Replace <authorization_code> with the previously noted authorization code.
The call returns an object that contains an access_token key and a refresh_token key.
Note the value of the refresh_token key.
The following procedure uses the Postman app to generate an access token.
Open the Postman app.
In the Authorization tab, set the authorization type to OAuth 2.0.
Configure values for the Client ID, Client secret, authorization URL and access token URL.
Set the value of the Scope field to
2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default offline_access
.Set the value of the Callback URL field to the redirect URL determined in earlier procedures.
Click Get New Access Token.
Sign into Microsoft Azure and accept the app privilege requests at the consent dialog box.
The browser sends an access token and a refresh token as a response. Note the value of the refresh token.
When the External Stage type is ADLS Gen 2 and the authentication type is Azure AD, you must grant the service principal account the Storage Blob Data Contributor privilege before generating the access and refresh tokens.
CREATE OR REPLACE TARGET db USING Global.DeltaLakeWriter ( tenantID: '71bfeed5-1905-43da-a4a4-49d8490731da', connectionUrl: 'jdbc:spark://adb-8073469162361072.12.azuredatabricks.net:443/default; transportMode=http;ssl=1; httpPath=sql/protocolv1/o/8073469162361072/0301-101350-kprc8x3a; AuthMech=3;UID=token;PWD=<personal-access-token>', stageLocation: '/', CDDLAction: 'Process', adapterName: 'DeltaLakeWriter', authenticationType: 'AzureAD', ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m', ClientSecret: 'untNjHnQOzsY90BjrKs2napohIP8WebUUcXybRdKVURH0XeklB5+Xw8NZgZUylqn', ClientSecret_encrypted: 'true', ClientID: 'dcf190e8-a315-42bb-a0b1-86063ff1c340', RefreshToken_encrypted: 'true', Mode: 'APPENDONLY', externalStageType: 'ADLSGen2', Tables: 'public.sample_pk,default.testoauth', azureAccountName: 'samplestorage', RefreshToken: '<refresh-token-value>', azureContainerName: 'striim-deltalakewriter-container', uploadPolicy: 'eventcount:10000,interval:60s' ) INPUT FROM sysout;
CREATE TARGET db USING Global.DeltaLakeWriter (
connectionUrl: 'jdbc:spark://adb-8073469162361072.12.azuredatabricks.net:443/default;
transportMode=http;ssl=1;
httpPath=sql/protocolv1/o/8073469162361072/0301-101350-kprc8x3a;
AuthMech=3;UID=token;PWD=<personal-access-token>',
azureAccountAccessKey: '2YoK5czZpmPjxSiSe7uFVXrb9jt9P4xrWp+NNKxWzjU=',
stageLocation: '/',
CDDLAction: 'Process',
ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
authenticationType: 'PersonalAccessToken',
Mode: 'APPENDONLY',
externalStageType: 'ADLSGen2',
Tables: 'public.sample_pk,default.testoauth',
azureAccountName: 'samplestorage',
azureAccountAccessKey_encrypted: 'true',
personalAccessToken: 'GGR/zQHfh7wQa3vJhP6dcWtejN1UL+E8YEXc13g9+UZdTQmYN1h3E0d0jabboJsd',
personalAccessToken_encrypted: 'true',
uploadPolicy: 'eventcount:10000,interval:60s' )
INPUT FROM sysout;
Databricks Writer properties
When creating a Databricks Writer target in TQL, you must specify values for the Connection URL, Hostname, Personal Access Token, and Tables properties. If not specified, the other properties will use their default values.
property | type | default value | notes |
---|---|---|---|
Authentication Type | enum | PersonalAccessToken | With the default setting PersonalAccessToken, Striim's connection to Databricks is authenticated using the token specified in Personal Access Token. Set to AzureAD to authenticate using Azure Active Directory. In this case, specify Client ID, Client Secret, Refresh Token, and Tenant ID. See Databricks authentication mechanisms for details. |
CDDL Action | enum | Process | See Handling schema evolution. If TRUNCATE commands may be entered in the source and you do not want to delete events in the target, precede the writer with a CQ with the select statement |
Client ID | string | This property is required when AzureAD authentication is selected as the value of the Authentication Type property. | |
Client Secret | encrypted password | This property is required when AzureAD authentication is selected as the value of the Authentication Type property. | |
Connection Retry Policy | String | initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m | Do not change unless instructed to by Striim support. |
Connection URL | String | Provide the JDBC URL from the JDBC/ODBC tab of the Databricks cluster's Advanced options (see Get connection details for a cluster). If the URL starts with | |
External Stage Type | enum |
| With the default value (not recommended), events are staged to DBFS storage at the path specified in Stage Location. To use an external stage, your Databricks instance should be using Databricks Runtime 11.0 or later. When using serverless compute, you must select If running Databricks on AWS and you want to use S3, set to In Striim 4.2.0.1 or later only, if running Databricks on AWS and you want to use a Personal Staging Area, see Using an AWS personal staging location. In Striim 4.2.0.4 or later only, if running Azure Databricks and you want to use a personal staging location, see Using an Azure personal staging location. If running Azure Databricks and you want to use Azure Data Lake Storage Gen2, set to |
Hostname | String | the Server Hostname from the JDBC/ODBC tab of the Databricks cluster's Advanced options (see Get connection details for a cluster) | |
Ignorable Exception Code | String | Set to TABLE_NOT_FOUND to prevent the application from terminating when Striim tries to write to a table that does not exist in the target. See Handling "table not found" errors for more information. Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). | |
Mode | enum | AppendOnly | With the default value AppendOnly:
Set to Merge to handle updates and deletes as updates and deletes instead. In Merge mode:
|
Optimized Merge | Boolean | false | In Flow Designer, this property will be displayed only when Mode is Merge. Set to True only when Mode is MERGE and the target's input stream is the output of an HP NonStop reader, MySQL Reader, or Oracle Reader source and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false. |
Parallel Threads | Integer | Not supported when Mode is Merge. | |
Personal Access Token | encrypted password | Used to authenticate with the Databricks cluster (see Generate a personal access token). The user associated with the token must have read and write access to DBFS (see Important information about DBFS permissions). If table access control has been enabled, the user must also have MODIFY and READ_METADATA (see Data object privileges - Data governance model). | |
Personal Staging User Name | String | When External Stage Type is PersonalStagingLocation, set as desecribed in Using an AWS personal staging location or Using an Azure personal staging location. | |
Refresh Token | encrypted password | This property is required when AzureAD authentication is selected as the value of the Authentication Type property. | |
Stage Location | String |
| When the External Stage Type is DBFSROOT, the path to the staging area in DBFS, for example, |
Tables | String | The name(s) of the table(s) to write to. The table(s) must exist in the database. Specify target table names in uppercase as When the target's input stream is a user-defined event, specify a single table. The only special character allowed in target table names is underscore ( When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the source.emp,target_database.emp source_schema.%,target_catalog.target_database.% source_database.source_schema.%,target_database.% source_database.source_schema.%,target_catalog.target_database.% MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as See Mapping columns for additional options. | |
Tenant ID | String | This property is required when AzureAD authentication is selected as the value of the Authentication Type property. | |
Upload Policy | String | eventcount:100000, interval:60s | The upload policy may include eventcount and/or interval (see Setting output names and rollover / upload policies for syntax). Buffered data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every 60 seconds or sooner if the buffer contains 100,000 events. When the app is quiesced, any data remaining in the buffer is written to the storage account; when the app is undeployed, any data remaining in the buffer is discarded. |
Azure Data Lake Storage (ADLS) Gen2 properties for Databricks Writer
To use an ADLS Gen2 bucket as your staging area, your Databricks instance should be using Databricks Runtime 11.0 or later.
property | type | default value | notes |
---|---|---|---|
Azure Account Access Key | encrypted password | When Authentication Type is set to ServiceAccountKey, specify the account access key from Storage accounts > <account name> > Access keys. When Authentication Type is set to AzureAD, this property is ignored in TQL and not displayed in the Flow Designer. | |
Azure Account Name | String | the name of the Azure storage account for the blob container | |
Azure Container Name | String | striim-deltalakewriter-container | the blob container name from Storage accounts > <account name> > Containers If it does not exist, it will be created. |
Amazon S3 properties for Databricks Writer
To use an Amazon S3 bucket as your staging area, your Databricks instance should be using Databricks Runtime 11.0 or later.
property | type | default value | notes |
---|---|---|---|
S3 Access Key | String | an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket If the Striim host has default credentials stored in the | |
S3 Bucket Name | String | striim-deltalake-bucket | Specify the S3 bucket to be used for staging. If it does not exist, it will be created. |
S3 Region | String | us-west-1 | the AWS region of the bucket |
S3 Secret Access Key | encrypted password | the secret access key for the access key If the Striim host has default credentials stored in the |
Using an AWS personal staging location
Using a personal staging location requires Striim 4.2.0.1 or later and Unity Catalog (see Documentation > Data Governance Guide > What is Unity Catalog?). You may not use a personal staging location when the Authentication Type is AzureAD.
To use a personal staging location as your staging area:
Create an S3 bucket as described in Documentation > Data Governance Guide > What is Unity Catalog? > Get started using Unity Catalog > Configure a storage bucket and IAM role in AWS.
Configure that S3 bucket as described in Documentation > Data Governance Guide > What is Unity Catalog? > Get started using Unity Catalog > Configure Unity Catalog storage account for CORS > Configure CORS settings for S3.
Set External Stage Type to
PersonalStagingLocation
.For Personal Staging User Name, specify a user name or application ID for a user or service principal with a personal access token (see Documentation > Security and compliance guide > Authentication and access control > Manage personal access tokens):
If the personal access token is associated with a Databricks user, specify the user name.
If the personal access token is associated with the Databricks service principal, specify its Application Id (see Documentation > Developer tools and guidance > Use service principals with Databricks > Provision a service principal for Databricks automation - Databricks UI > Generate a Databricks personal access token for the Databricks service principal).
Using an Azure personal staging location
Using a personal staging location requires Striim 4.2.0.4 or later and Unity Catalog (see Learn / Azure Databricks documentation / Data Governance / What is Unity Catalog?). You may not use a personal staging location when the Authentication Type is AzureAD.
To use a Personal Staging Location as your staging area:
Create a Unity Catalog metastore (see Learn / Azure Databricks documentation / Data Governance / Create a Unity Catalog metastore).
Configure that metastore as described in Learn / Azure Databricks documentation / Data Governance / Create a Unity Catalog metastore / Enable Azure Databricks management for personal staging locations.
Set External Stage Type to
PersonalStagingLocation
.Set Personal Staging User Name specify a user name or application ID for a user or service principal with a personal access token (see Learn > Azure Databricks documentation > Administration Guide > Manage personal access tokens):
If the personal access token is associated with a Databricks user, specify the user name.
If the personal access token is associated with the Databricks service principal, specify its Application Id (see Learn / Azure Databricks documentation / Developer Tools / Provision a service principal for Azure Databricks automation - Azure Databricks UI).
Sample TQL application using Databricks Writer
Sample TQL in AppendOnly mode:
CREATE TARGET DatabricksAppendOnly USING DeltaLakeWriter ( personalAccessToken: '*************************', hostname:'adb-xxxx.xx.azuredatabricks.net', tables: 'mydb.employee,mydatabase.employee', stageLocation: '/StriimStage/', connectionUrl:'jdbc:xxx.xx;transportMode=http;ssl=1;httpPath=xxx;AuthMech=3;UID=token;' ) INPUT FROM ns1.sourceStream;
Sample TQL in Merge mode with Optimized Merge set to True:
CREATE TARGET DatabricksAppendOnly USING DeltaLakeWriter ( personalAccessToken: '*************************', hostname:'adb-xxxx.xx.azuredatabricks.net', tables: 'mydb.employee,mydatabase.employee', stageLocation: '/StriimStage/', connectionUrl:'jdbc:xxx.xx;transportMode=http;ssl=1;httpPath=xxx;AuthMech=3;UID=token;', mode: 'MERGE', optimizedMerge: 'true' ) INPUT FROM ns1.sourceStream;
Databricks Writer data type support and mapping
See also Data type support & mapping for schema conversion & evolution.
TQL type | Delta Lake type |
---|---|
java.lang.Byte | binary |
java.lang.Double | double |
java.lang.Float | float |
java.lang.Integer | int |
java.lang.Long | bigint |
java.lang.Short | smallint |
java.lang.String | string |
org.joda.time.DateTime | timestamp |
For additional data type mappings, see Data type support & mapping for schema conversion & evolution.