Azure Cosmos DB using Core (SQL) API
Azure Cosmos DB is a web-based no-SQL database from Microsoft. For more information, see Azure Cosmos DB and Common Azure Cosmos DB use cases, and Striim for Azure Cosmos DB.
Cosmos DB Reader reads documents from one or more Cosmos DB containers using Cosmos DB's native Core (SQL) API. Its output stream type is JSONNodeEvent, so it requires targets that can read JSONNodeEvents. Alternatively, you must convert the output to a user-defined type (see Converting JSONNodeEvent output to a user-defined type for an example).
This reader sends both inserts and updates as inserts. This means that to support replicating Cosmos DB documents the writer must support upsert mode. In upsert mode, a new document (one whose id
field does not match that of any existing document) is handled as an insert and an update to an existing documents (based on matching id
fields) is handled as an update. For replication, this limits the choice of writers to Cosmos DB Writer and Mongo Cosmos DB Writer. Append-only targets such as files, blobs, and Kafka are also supported so long as they can handle a JSONNodeEvent input stream.
Be sure to provision sufficient Request Units (see Request Units in Azure Cosmos DB) to handle the volume of data you expect to read. If you do not, the reader be unable to keep up with the source data.,
Cosmos DB setup for Cosmos DB Reader
Request Units
Provision sufficient Request Units to handle the volume of data you expect to read. For more information, see Request Units in Azure Cosmos DB.
Capturing deletes
Azure Cosmos DB's change feed does not capture deletes. To work around this limitation:
Set time-to-live (TTL) to
-1
on the container(s) to be read. One way to do this is described in Create an Azure Cosmos DB container with unique key policy and TTL. The-1
value means Cosmos DB will not automatically delete any documents, but Cosmos DB Reader will be able to set the TTL for individual documents in order to delete them.Set Cosmos DB Reader's Cosmos DB Config property to:
{"Operations": {"SoftDelete": {"FieldName" : "IsDeleted","FieldValue" : "true"}}}
This will add
"IsDeleted":"true"
to the output for deleted fields, as shown by the example in Cosmos DB Reader example output. That will tell Cosmos DB Writer or Mongo Cosmos DB Writer to delete the corresponding document in the Cosmos DB target. Cosmos DB Reader will also set the TTL of the deleted document to 5, so it will be deleted by Cosmos DB five seconds later.
Cosmos DB Reader properties
The Azure Cosmos Java driver used by this reader is bundled with Striim.
property | type | default value | notes |
---|---|---|---|
Access Key | encrypted password | The Primary Key or Secondary Key from the Keys read-only tab of your Cosmos DB account. | |
Connection Retry Policy | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Containers | String | The fully-qualified name(s) of the container(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Container names are case-sensitive. You may use the % wildcard, for example, Note that data will be read only from containers that exist when the Striim application starts, additional containers added later will be ignored until the application is restarted. | |
Cosmos DB Config | String | Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Cosmos DB Reader. | |
Exclude Containers | String | Any containers to be excluded from the set specified in the Containers property. Specify as for the Containers property. | |
Fetch Size | Integer | 1000 | The number of documents the adapter will fetch in a single read operation. |
Mode | String | InitialLoad | With the default Set to |
Overload Retry Policy | String | maxRetryTimeInSecs=30, maxRetries=9 | This policy determines how the reader handles RequestRateTooLargeException errors received from Cosmos DB. |
Polling Interval | Integer | 10 | The time in milliseconds the adapter will wait before polling the change feed for new documents. With the default value of |
Quiesce on IL Completion | Boolean | False | |
Service Endpoint | String | The URI from the Overview page of your Cosmos DB account. | |
Start Timestamp | Optionally, in incremental mode, specify a YYYY YYYY-MM YYYY-MM-DD YYYY-MM-DD"T"hhTZD YYYY-MM-DD"T"hh:mmTZD YYYY-MM-DD"T"hh:mm:ssTZD YYYY-MM-DD"T"hh:mm:ss.sssTZD | ||
ThreadPool Size | Integer | 10 | The number of threads Striim will use for reading containers. If this number is lower than the number of containers being read, threads will be read in round-robin fashion. If this number equals the number of containers, each thread will read from one container. If this number exceeds the number of containers, only this number of threads will be active. |
Cosmos DB Reader JSONNodeEvent fields
The output type for Cosmos DB Reader is JSONNodeEvent. The fields are:
data: contains the field names and values of a document, for example:
data:{ "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG", "brand":"Jerry's", "type":"plums", "quantity":"50" }
metadata: contains the following elements:
CollectionName: the collection from which the document was read
DatabaseName: the database of the collection
DocumentKey: the value of the
id
field of the documentNamespace:
<database name>.<collection name>
OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")
Timestamp: the Unix epoch time at which the operation was performed in the source
For example:
metadata:{ "CollectionName":"container2", "OperationName":"SELECT", "DatabaseName":"testDB", "DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG", "NameSpace":"testDB.container2", "ResumeToken":"", "TimeStamp":1639999991 }
Cosmos DB Reader example application
The following application will read CDC data from Cosmos DB and write it to MongoDB.
CREATE APPLICATION CosmosToMongo recovery 5 second interval; CREATE SOURCE CosmosSrc USING CosmosDBReader ( CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" : \"IsDeleted\",\"FieldValue\" : \"true\"}}}', Mode: 'Incremental', AccessKey: '*******', Containers: 'src.emp', ServiceEndpoint: 'https://******.documents.azure.com:443/' } OUTPUT TO cout; CREATE TARGET MongoTarget USING MongoDBWriter ( collections: 'src.emp,targdb.emp', ConnectionURL: ******:27018', Password: '******', Username: 'myuser', AuthDB: 'targdb', upsertMode: 'true' ) INPUT FROM cout; END APPLICATION CosmosToMongo;
Cosmos DB Reader example output
Initial load
When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.
JsonNodeEvent{ data:{ "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG", "brand":"Jerry's", "type":"plums", "quantity":"50" } metadata:{ "CollectionName":"container2", "OperationName":"SELECT", "DatabaseName":"testDB", "DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG", "NameSpace":"testDB.container2", "TimeStamp":1639999991 } userdata:null } removedfields:null };
Incremental - insert
JsonNodeEvent{ data:{ "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO", "brand":"Kraft Heinz", "type":"kool-aid", "quantity":"50" } metadata:{ "CollectionName":"container2", "OperationName":"INSERT", "DatabaseName":"testDB", "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO", "NameSpace":"testDB.container2", "TimeStamp":1643876905 } userdata:null } removedfields:null };
Incremental - delete
Note that this includes the IsDeleted field discussed in Cosmos DB setup for Cosmos DB Reader.
JsonNodeEvent{ data:{ "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO", "brand":"Kraft Heinz", "type":"kool-aid", "quantity":"50", "IsDeleted":"true" } metadata:{ "CollectionName":"container2", "OperationName":"DELETE", "DatabaseName":"testDB", "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO", "NameSpace":"testDB.container2", "TimeStamp":1643877020 } userdata:null } removedfields:null };
Cosmos DB Reader limitations
The change feed captures field-level updates as document replace operations, so the entire document will be read.
The change feed does not capture deletes. Use the "soft delete" approach to add an IsDeleted field with value True to target documents that have been deleted in the source (see Cosmos DB setup for Cosmos DB Reader and Cosmos DB Reader example output).
If there are multiple replace operations on a document during the polling interval (see Cosmos DB Reader properties), only the last will be read.
When a document's
id
field is changed, the change feed treats it as an insert rather than a replace, so the previous version of the document with the oldid
field will not be overwritten, and it will remain in the target.Document
id
fields must be unique across all partitions. Otherwise you may encounter errors or data corruption.Multi-region writes are not supported.
The order of operations is guaranteed to be preserved only for events with the same partition key. The order of operations may not be preserved for events with different partition keys.
Cosmos DB's change feed timestamp (
_ts
) resolution is in seconds. Consequently, to avoid events being missing from the target, recovery will start one second earlier than the time of the last recovery checkpoint, so there may be some duplicate events.The change feed does not capture delete operations. Consequently, recovery will not capture those operations, and the deleted documents will remain in the target.
Cosmos DB's change feed does not capture changes to deleted documents. Consequently, if the Striim application is offline when a document is changed, and document is deleted before recovery starts, the.changes will not be written to the target during recovery.