MongoDB
Striim supports MongoDB versions 2.6 through 5.0 and MongoDB and MongoDB Atlas on AWS, Azure, and Google Cloud Platform.
Striim provides a template for creating applications that read from MongoDB and write to Cosmos DB. See Creating an application using a template for details.
MongoDB setup
MongoDBReader reads data from the Replica Set Oplog, so to use it you must be running a replica set (see Deploy a Replica Set or Convert a Standalone to a Replica Set, or your MongoDB cloud provider's documentation ). For more information, see Replication and Replica Set Data Synchronization.
In InitialLoad mode, the user specified in MongoDBReader's Username property must have read access to all databases containing the specified collections. In Incremental mode, the user must have read access to the local
database and the oplog.rs
collection. The oplog is a capped collection, which means that the oldest data is automatically removed to keep it within the specified size. To support recovery, the oplog must be large enough to retain all data that may need to be recovered. See Oplog Size and Change Oplog Size for more information.
For MongoDB Atlas, in InitialLoad mode, the user specified in MongoDBReader's Username property must have the atlasAdmin
privilege (see Built-in Roles) as this is required to read the oplog.
To support recovery (see Recovering applications), the replica set's oplog must be large enough to retain all the events generated while Striim is offline.
Using SSL and Kerberos authentication with MongoDB
To secure your authentication parameters, store the entire Security Config string in a vault (see use Using vaults). For example, assuming your Kerberos realm is MYREALM.COM
, its KDC is kerberos.realm.com
, the path to the SSL trust store is /cacerts
, the path to the SSL keystore file is /client.pkcs12
, and the passwords for both stores is MyPassword
, the Striim console commands to store the Security Config string with the key SSLKerberos in a vault named MongoDBVault would be:
CREATE VAULT MongoDBVault; WRITE INTO MongoDBVault ( vaultKey: "SSLKerberos", vaultValue: "RealmName:MYREALM.COM; KDC:kerberos.myrealm.com; KeyStore:/keystore.pkcs12; TrustStore:/cacerts; trustStorePassword:MyPassword; KeyStorePassword:MyPassword" )
Enter READ ALL FROM MongoDBVault;
to verify the contents.
You would then specify the Security Config as [[MongoDBVault.SSLKerberos]]
.
MongoDBReader properties
The MongoDB driver is bundled with Striim, so no installation is necessary.
The adapter properties are:
property | type | default value | notes |
---|---|---|---|
authDB | String | admin | Specify the authentication database for the specified username. If not specified, uses the |
authType | enum | Default | Specify the authentication mechanism used by your MongoDB instance. The default setting uses MongoDB's default authentication mechanism, SCRAM. Other supported choices are GSSAPI, MONGODBCR, MONGODBX509, and SCRAMSHA1. Set to NoAuth if authentication is not enabled. Set to GSSAPI if you are using Kerberos. |
Collections | String | The fully-qualified name(s) of the MongoDB collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. You may use the $ wildcard, for example, Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted. | |
Connection Retry Policy | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String | When Mode is InitialLoad
When Mode is Incremental
| |
Exclude Collections | String | Any collections to be excluded from the set specified in the Collections property. Specify as for the Collections property. | |
Full Document Update Lookup | Boolean | False | With the default setting of False, for UPDATE events the JSONNodeEvent data field will contain only the _id and modified values. Set to True to include the entire document. Note that the document will be the current version, and depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update. Enabling this option setting may affect performance, since MongoDB Reader will have to call the database to fetch more data. |
Mode | String | InitialLoad | With the default setting, will load all existing data using Set to |
Password | encrypted password | The password for the specified Username. | |
Quiesce on IL Completion | Boolean | False | |
Read Preference | String | primaryPreferred | See Read Preference Modes. Supported values are primary, primaryPreferred, secondary, secondaryPreferred, and nearest. |
Security Config | String | ||
SSL Enabled | Boolean | False | If MongoDB requires SSL or individual MongoDB nodes are specified in the Connection URL, set to True (see Configure mongod and mongos for TLS/SSL) |
Start Timestamp | String | Leave blank to read only new data. Specify a UTC DateTime value (for example, 2018-07-18T04:56:10) to read all data from that time forward or to wait to start reading until a time in the future. If the MongoDB and Striim servers are in different time zones, adjust the value to match the Striim time zone. If the oplog no longer contains data back to the specified time, reading will start from the beginning of the oplog. If milliseconds are specified (for example, 2017-07-18T04:56:10.999), they will be interpreted as the incrementing ordinal for the MongoDB timestamp (see Timestamps). | |
Username | String | A MongoDB user with access as described in MongoDB setup. |
MongoDBReader JSONNodeEvent fields
The output type for MongoDBReader is JSONNodeEvent. The fields are:
data: contains the field names and values of a document, for example:
data: {"_id":2441,"company":"Striim","city":"Palo Alto"}
Updates include only the modified values. Deletes include only the document ID.
removedfields: contains the names of any fields deleted by the $unset function. If no fields were deleted, the value of removedfields
is null
. For example:
removedfields: {"myField":true}
Or if no fields have been removed:
removedfields: null
metadata: contains the following elements:
CollectionName: the collection from which the document was read
OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT, UPDATE, or DELETE (operations within a transaction are not included; see Oplog does not record operations within a transaction)
DatabaseName: the database of the collection
DocumentKey: the document ID (same as the
_id
value in data)Namespace:
<database name>.<collection name>
Timestamp: in InitialLoad mode, the current time of the Striim server when the document was read; in Incremental mode, the timestamp from the oplog
For example:
metadata: {"CollectionName":"employee","OperationName":"SELECT","DatabaseName":"test", "DocumentKey":1.0,"NameSpace":"test.employee","TimeStamp":1537433999609}
MongoDBReader example application and output
The following Striim application will write change data for the specified collection to SysOut. To run this yourself, replace striim
and ******
with the user name and password for the MongoDB user account discussed in MongoDB setup, specify the correct connection URL for your instance, and replace mydb
with the name of your database.
CREATE APPLICATION MongoDBTest; CREATE SOURCE MongoDBIn USING MongoDBReader ( Username:'striim', Password:'******', ConnectionURL:'192.168.1.10:27107', Collections:'mydb.employee' ) OUTPUT TO MongoDBStream; CREATE TARGET MongoDBCOut USING SysOut(name:MongoDB) INPUT FROM MongoDBStream; END APPLICATION MongoDBTest;
With the above application running, the following MongoDB shell commands:
use mydb; db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":10,"comment":"new record"}); db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":40,"comment":"updated record"}); db.employee.update({_id:1},{$set:{"comment":"partial update"}}); db.employee.remove({_id:1});
would produce output similar to the following:
data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":10.0,"comment":"new record"} metadata: {"CollectionName":"employee","OperationName":"INSERT","DatabaseName":"mydb", "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474} ... data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":40.0,"comment":"updated record"} metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb", "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474} ... data: {"_id":1.0,"comment":"partial update"} metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb", "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474} ... data: {"_id":1.0} metadata: {"CollectionName":"employee","OperationName":"DELETE","DatabaseName":"mydb", "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250477}
The Mongo Java driver adds the decimals. These have no effect.
Note that output for the "partial" update and delete operations includes only the fields specified in the shell commands. See Replicating MongoDB data to Azure CosmosDB for discussion of the issues this can cause when writing to targets and how to work around those issues.
Replicating MongoDB data to Azure CosmosDB
To replicate one or many MongoDB collections to Cosmos DB, specify multiple collections in the Collections properties of MongoDBReader and CosmosDBWriter. You may use wildcards ($ for MongoDB, % for Cosmos DB) to replicate all collections in a database, as in the example below, or specify multiple collections manually, as described in the notes for Cosmos DB Writer's Collections property.
You must create the target collections in Cosmos DB manually. The partition key names must match one of the fields in the MongoDB documents.
Data will be read only from collections that exist when the source starts. Additional collections added later will be ignored until the source is restarted. When the target collection is in a fixed container (see Partition and scale in Azure Cosmos DB), inserts, updates, and deletes are handled automatically. When the target collection is in an unlimited container, updates require special handling and deletes must be done manually, as discussed below.
If you wish to run the examples, adjust the MongoDBReader properties and Cosmos DB Writer properties to reflect your own environment.
When the target collection is in a fixed container
Note
Writing to a target collection in a fixed container will not be possible until Microsoft fixes the bug discussed in this Azure forum discussion.
In Cosmos DB, create database mydb containing the collection employee with partition key /name (note that the collection and partition names are case-sensitive).
In MongoDB, create the collection employee and populate it as follows:
use mydb; db.employee.insertMany([ {_id:1,"name":"employee1","company":"Striim","city":"Madras"}, {_id:2,"name":"employee2","company":"Striim","city":"Seattle"}, {_id:3,"name":"employee3","company":"Striim","city":"California"} ]);
In Striim, run the following application to perform the initial load of the existing data:
CREATE APPLICATION Mongo2CosmosInitialLoad; CREATE MongoDBIn USING MongoDBReader ( Username:'striim', Password:'******', ConnectionURL:'<MongoDB connection string>', Collections:'mydb.$' ) OUTPUT TO MongoDBStream; CREATE TARGET WriteToCosmos USING CosmosDBWriter ( ServiceEndpoint: '<Cosmos DB connection string>', AccessKey: '<Cosmos DB account read-write key>', Collections: 'mydb.$,mydb.%' ) INPUT FROM MongoDBStream; END APPLICATION Mongo2CosmosInitialLoad;
After the application is finished, the Cosmos DB employee collection should contain the following:
{ "_id": 1, "name": "employee1", "company": "striim", "city": "madras", "id": "1.0", "_rid": "HnpSALVXpu4BAAAAAAAACA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/", "_etag": "\"0800b33d-0000-0000-0000-5bb5aafa0000\"", "_attachments": "attachments/", "_ts": 1538632442 } { "_id": 2, "name": "employee2", "company": "striim", "city": "seattle", "id": "2.0", "_rid": "HnpSALVXpu4BAAAAAAAADA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/", "_etag": "\"2b00f87b-0000-0000-0000-5bb5aafb0000\"", "_attachments": "attachments/", "_ts": 1538632443 } { "_id": 3, "name": "employee3", "company": "striim", "city": "california", "id": "3.0", "_rid": "HnpSALVXpu4BAAAAAAAAAA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/", "_etag": "\"2700ad2a-0000-0000-0000-5bb5aafb0000\"", "_attachments": "attachments/", "_ts": 1538632443 }
In Striim, run the following application to continuously replicate new data from MongoDB to Cosmos DB:
CREATE APPLICATION Mongo2CosmosIncrementalFixedContainer; CREATE MongoDBIn USING MongoDBReader ( Username:'striim', Password:'******', ConnectionURL:'<MongoDB connection string>', authType: 'NoAuth', Mode:'Incremental', startTimestamp: '<timestamp>' Collections:'mydb.$' ) OUTPUT TO MongoDBStream; CREATE TARGET WriteToCosmos USING CosmosDBWriter ( ServiceEndpoint:'<Cosmos DB connection string>', AccessKey:'<Cosmos DB account read-write key>', Collections:'mydb.$,mydb.%' ) INPUT FROM MongoDBStream ; CREATE CQ SelectDeleteOperations INSERT INTO DeleteOpsStream SELECT META(MongoDBStream,"DatabaseName"), META(MongoDBStream,"CollectionName"), META(MongoDBStream,"DocumentKey") FROM MongoDBStream WHERE META(MongoDBStream,"OperationName").toString() = "DELETE"; CREATE TARGET WriteIgnoredDeleteOps USING FileWriter ( filename:'DeleteOperations.json' ) FORMAT USING JSONFormatter() INPUT FROM DeleteOpsStream; END APPLICATION Mongo2CosmosIncrementalFixedContainer;
In MongoDB, modify the employees collection as follows to add employee4:
use mydb; db.employee.save({_id:4,"name":"employee4","company":"Striim","city":"Palo Alto"}); db.employee.save({_id:1,"name":"employee1","company":"Striim","city":"Seattle"}); db.employee.update({_id:2},{$set : {"city":"Palo Alto"}}); db.employee.remove({_id:3});
Within 30 seconds, those changes should be replicated to the corresponding Cosmos DB collection with results similar to the following:
{ "_id": 1, "name": "employee1", "company": "striim", "city": “Seattle”, "id": "1.0", "_rid": "HnpSALVXpu4BAAAAAAAACA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/", "_etag": ""0800b33d-0000-0000-0000-5bb5aafa0000"", "_attachments": "attachments/", "_ts": 1538632442 } { "_id": 2, "name": "employee2", "company": "striim", "city": “Palo Alto”, "id": "2.0", "_rid": "HnpSALVXpu4BAAAAAAAADA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/", "_etag": ""2b00f87b-0000-0000-0000-5bb5aafb0000"", "_attachments": "attachments/", "_ts": 1538632443 } { "_id": 3, "name": "employee3", "company": "striim", "city": "california", "id": "3.0", "_rid": "HnpSALVXpu4BAAAAAAAAAA==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/", "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"", "_attachments": "attachments/", "_ts": 1538632443 } { "_id": 4, "name": "employee4”, "company": "striim", "city": “Palo Alto”, "id": “4.0”, "_rid": "HnpSALVXpu4BAAAAAAAAAE==", "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAE==/", "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"", "_attachments": "attachments/", "_ts": 1538632443 }
When the target collection is in an unlimited container
When a Cosmos DB collection is in an unlimited container, it must have a partition key, which must be specified when you create the collection.
When MongoDB save operations create new documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can write to the correct partition.
When MongoDB save operations update existing documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can use the partition key and document ID to update the correct target document.
MongoDB update operations do not include all fields, so the partition key may be missing from MongoDBReader's output. In those cases, the PartialRecordPolicy open processor retrieves the missing fields from MongoDB and adds them before passing the data to CosmosDBWriter.
MongoDB remove operations include only the document ID, so the partition key is missing from MongoDBReader's output. Since CosmosDBWriter would be unable to determine the correct partition, the application writes the database name, collection name, and document key to a DeleteOps collection in CosmosDB.

CREATE APPLICATION Mongo2CosmosIncrementalUnlimitedContainer; CREATE SOURCE MongoDBIn USING MongoDBReader ( Username:'striim', Password:'******', ConnectionURL:'<MongoDB connection string>', authType: 'NoAuth', Mode:'Incremental', startTimestamp: '<timestamp>', Collections:'mydb.$' ) OUTPUT TO MongoDBStream; CREATE STREAM FilteredMongoDBStream OF Global.JsonNodeEvent; CREATE CQ ExcludeDeleteOperations INSERT INTO FilteredMongoDBStream SELECT META(MongoDBStream,"DatabaseName"), META(MongoDBStream,"CollectionName"), META(MongoDBStream,"DocumentKey") FROM MongoDBStream WHERE META(MongoDBStream,"OperationName").toString() != "DELETE"; CREATE STREAM FullDocstream OF Global.JsonNodeEvent; CREATE OPEN PROCESSOR CompletePartialDocs USING MongoPartialRecordPolicy ( ConnectionURL:'<MongoDB connection string>', authType:'NoAuth', OnMissingDocument: 'Process' ) INSERT INTO FullDocstream FROM FilteredMongoDBStream; CREATE TARGET WriteToCosmos USING CosmosDBWriter ( ServiceEndpoint:'<Cosmos DB connection string>', AccessKey:'<Cosmos DB account read-write key>', Collections:'mydb.$,mydb.%', IgnorableExceptionCode:'PARTITION_KEY_NOT_FOUND' ) INPUT FROM FullDocstream; CREATE CQ SelectDeleteOperations INSERT INTO DeleteOpsStream SELECT TO_STRING(META(MongoDBStream,"DatabaseName")) AS DatabaseName, TO_STRING(META(MongoDBStream,"CollectionName")) AS CollectionName, TO_STRING(META(MongoDBStream,"DocumentKey")) AS DocumentKey FROM MongoDBStream WHERE META(MongoDBStream,"OperationName").toString() = "DELETE"; CREATE TARGET WriteDeleteOpsToCosmos USING CosmosDBWriter ( ServiceEndpoint:'<Cosmos DB connection string>', AccessKey:'<Cosmos DB account read-write key>', Collections:'mydb.DeleteOps' ) INPUT FROM DeleteOpsStream; END APPLICATION Mongo2CosmosIncrementalUnlimitedContainer;