Skip to main content

Striim Cloud 4.1.2 documentation

MongoDB

Striim supports MongoDB versions 2.6 through 5.0 and MongoDB and MongoDB Atlas on AWS, Azure, and Google Cloud Platform.

Striim provides a template for creating applications that read from MongoDB and write to Cosmos DB. See Creating an application using a template for details.

MongoDB setup

MongoDBReader reads data from the Replica Set Oplog, so to use it you must be running a replica set (see Deploy a Replica Set or Convert a Standalone to a Replica Set, or your MongoDB cloud provider's documentation ). For more information, see Replication and Replica Set Data Synchronization.

In InitialLoad mode, the user specified in MongoDBReader's Username property must have read access to all databases containing the specified collections. In Incremental mode, the user must have read access to the local database and the oplog.rs collection. The oplog is a capped collection, which means that the oldest data is automatically removed to keep it within the specified size. To support recovery, the oplog must be large enough to retain all data that may need to be recovered. See Oplog Size and Change Oplog Size for more information.

For MongoDB Atlas, in InitialLoad mode, the user specified in MongoDBReader's Username property must have the atlasAdmin privilege (see Built-in Roles) as this is required to read the oplog.

To support recovery (see Recovering applications), the replica set's oplog must be large enough to retain all the events generated while Striim is offline.

Using SSL and Kerberos authentication with MongoDB

To secure your authentication parameters, store the entire Security Config string in a vault (see use Using vaults). For example, assuming your Kerberos realm is MYREALM.COM, its KDC is kerberos.realm.com, the path to the SSL trust store is /cacerts, the path to the SSL keystore file is /client.pkcs12, and the passwords for both stores is MyPassword, the Striim console commands to store the Security Config string with the key SSLKerberos in a vault named MongoDBVault would be:

CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "SSLKerberos",
  vaultValue: "RealmName:MYREALM.COM;
    KDC:kerberos.myrealm.com;
    KeyStore:/keystore.pkcs12;
    TrustStore:/cacerts;
    trustStorePassword:MyPassword;
    KeyStorePassword:MyPassword"
)

Enter READ ALL FROM MongoDBVault; to verify the contents.

You would then specify the Security Config as [[MongoDBVault.SSLKerberos]].

MongoDBReader properties

The MongoDB driver is bundled with Striim, so no installation is necessary.

The adapter properties are:

property

type

default value

notes

authDB

String

admin

Specify the authentication database for the specified username. If not specified, uses the admin database.

authType

enum

Default

Specify the authentication mechanism used by your MongoDB instance. The default setting uses MongoDB's default authentication mechanism, SCRAM. Other supported choices are GSSAPI, MONGODBCR, MONGODBX509, and SCRAMSHA1. Set to NoAuth if authentication is not enabled. 

Set to GSSAPI if you are using Kerberos.

Collections

String

The fully-qualified name(s) of the MongoDB collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas.

You may use the $ wildcard, for example, mydb.$ The wildcard is allowed only at the end of the string: for example, mydb.prefix$ is valid, but mydb.$suffix is not.

Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted.

Connection Retry Policy

String

retryInterval=60, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

When Mode is InitialLoad

  • and you are connecting to MongoDB with DNS SRV, specify mongodb+srv://<host name>/<authentication database>, for example, mongodb+srv://abcdev3.gcp.mongodb.net/mydb. If you do not specify a database, the connection will use admin.

  • and you are connecting to a sharded MongoDB instance with mongos, specify <IP address or host name>:<port> of the mongos instance.

  • and you are connecting to a sharded instance of MongoDB without mongos, specify <IP address or host name>:<port> for all instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

When Mode is Incremental

  • and you are connecting to an unsharded instance of MongoDB, specify <IP address or host name>:<port> for all instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

  • and you are connecting to a sharded instance of MongoDB, create a separate source for each shard. For each reader, specify all the instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

Exclude Collections

String

Any collections to be excluded from the set specified in the Collections property. Specify as for the Collections property.

Full Document Update Lookup

Boolean

False

With the default setting of False, for UPDATE events the JSONNodeEvent data field will contain only the _id and modified values.

Set to True to include the entire document. Note that the document will be the current version, and depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update. Enabling this option setting may affect performance, since MongoDB Reader will have to call the database to fetch more data.

Mode

String

InitialLoad

With the default setting, will load all existing data using db.collection.find()and stop. In this mode, MongoDBReader is not a CDC reader.

Set to Incremental to read CDC data continuously from the oplog.

Password

encrypted password

The password for the specified Username.

Quiesce on IL Completion

Boolean

False

Read Preference

String

primaryPreferred

See Read Preference Modes. Supported values are primary, primaryPreferred, secondary, secondaryPreferred, and nearest.

Security Config

String

See Using SSL and Kerberos authentication with MongoDB.

SSL Enabled

Boolean

False

If MongoDB requires SSL or individual MongoDB nodes are specified in the Connection URL, set to True (see Configure mongod and mongos for TLS/SSL)

Start Timestamp

String

Leave blank to read only new data. Specify a UTC DateTime value (for example, 2018-07-18T04:56:10) to read all data from that time forward or to wait to start reading until a time in the future. If the MongoDB and Striim servers are in different time zones, adjust the value to match the Striim time zone. If the oplog no longer contains data back to the specified time, reading will start from the beginning of the oplog.

If milliseconds are specified (for example, 2017-07-18T04:56:10.999), they will be interpreted as the incrementing ordinal for the MongoDB timestamp (see Timestamps).

Username

String

A MongoDB user with access as described in MongoDB setup.

MongoDBReader JSONNodeEvent fields

The output type for MongoDBReader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data: {"_id":2441,"company":"Striim","city":"Palo Alto"}

Updates include only the modified values. Deletes include only the document ID.

removedfields: contains the names of any fields deleted by the $unset function. If no fields were deleted, the value of removedfields is null. For example:

removedfields: {"myField":true}

Or if no fields have been removed:

removedfields: null

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT, UPDATE, or DELETE (operations within a transaction are not included; see Oplog does not record operations within a transaction)

  • DatabaseName: the database of the collection

  • DocumentKey: the document ID (same as the _id value in data)

  • Namespace<database name>.<collection name>

  • Timestamp: in InitialLoad mode, the current time of the Striim server when the document was read; in Incremental mode, the timestamp from the oplog

For example:

metadata: {"CollectionName":"employee","OperationName":"SELECT","DatabaseName":"test",
  "DocumentKey":1.0,"NameSpace":"test.employee","TimeStamp":1537433999609}

MongoDBReader example application and output

The following Striim application will write change data for the specified collection to SysOut. To run this yourself, replace striim and ****** with the user name and password for the MongoDB user account discussed in MongoDB setup, specify the correct connection URL for your instance, and replace mydb with the name of your database.

CREATE APPLICATION MongoDBTest;

CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'192.168.1.10:27107',
  Collections:'mydb.employee'
) 
OUTPUT TO MongoDBStream;

CREATE TARGET MongoDBCOut
USING SysOut(name:MongoDB)
INPUT FROM MongoDBStream;

END APPLICATION MongoDBTest;

With the above application running, the following MongoDB shell commands:

use mydb;
db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":10,"comment":"new record"});
db.employee.save({_id:1,"firstname":"Larry","lastname":"Talbot","age":40,"comment":"updated record"});
db.employee.update({_id:1},{$set:{"comment":"partial update"}});
db.employee.remove({_id:1});

would produce output similar to the following:

data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":10.0,"comment":"new record"}
metadata: {"CollectionName":"employee","OperationName":"INSERT","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0,"firstname":"Larry","lastname":"Talbot","age":40.0,"comment":"updated record"}
metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0,"comment":"partial update"}
metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250474}
...
data: {"_id":1.0}
metadata: {"CollectionName":"employee","OperationName":"DELETE","DatabaseName":"mydb",
  "DocumentKey":1.0,"NameSpace":"mydb.employee","TimeStamp":1537250477}

The Mongo Java driver adds the decimals. These have no effect.

Note that output for the "partial" update and delete operations includes only the fields specified in the shell commands. See Replicating MongoDB data to Azure CosmosDB for discussion of the issues this can cause when writing to targets and how to work around those issues.

Replicating MongoDB data to Azure CosmosDB

To replicate one or many MongoDB collections to Cosmos DB, specify multiple collections in the Collections properties of MongoDBReader and CosmosDBWriter. You may use wildcards ($ for MongoDB, % for Cosmos DB) to replicate all collections in a database, as in the example below, or specify multiple collections manually, as described in the notes for Cosmos DB Writer's Collections property.

You must create the target collections in Cosmos DB manually. The partition key names must match one of the fields in the MongoDB documents.

Data will be read only from collections that exist when the source starts. Additional collections added later will be ignored until the source is restarted. When the target collection is in a fixed container (see Partition and scale in Azure Cosmos DB), inserts, updates, and deletes are handled automatically. When the target collection is in an unlimited container, updates require special handling and deletes must be done manually, as discussed below.

If you wish to run the examples, adjust the MongoDBReader properties and Cosmos DB Writer properties to reflect your own environment.

When the target collection is in a fixed container

Note

Writing to a target collection in a fixed container will not be possible until Microsoft fixes the bug discussed in this Azure forum discussion.

  1. In Cosmos DB, create database mydb containing the collection employee with partition key /name  (note that the collection and partition names are case-sensitive).

  2. In MongoDB, create the collection employee and populate it as follows:

    use mydb;
    db.employee.insertMany([
    {_id:1,"name":"employee1","company":"Striim","city":"Madras"},
    {_id:2,"name":"employee2","company":"Striim","city":"Seattle"},
    {_id:3,"name":"employee3","company":"Striim","city":"California"}
    ]);
  3. In Striim, run the following application to perform the initial load of the existing data:

    CREATE APPLICATION Mongo2CosmosInitialLoad; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
     
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream;
     
    END APPLICATION Mongo2CosmosInitialLoad;

    After the application is finished, the Cosmos DB employee collection should contain the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": "madras",
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": "\"0800b33d-0000-0000-0000-5bb5aafa0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": "seattle",
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": "\"2b00f87b-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": "\"2700ad2a-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    
  4. In Striim, run the following application to continuously replicate new data from MongoDB to Cosmos DB:

    CREATE APPLICATION Mongo2CosmosIncrementalFixedContainer; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      authType: 'NoAuth',
      Mode:'Incremental',
      startTimestamp: '<timestamp>'
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
    
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint:'<Cosmos DB connection string>',
      AccessKey:'<Cosmos DB account read-write key>',
      Collections:'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream ;
    
    CREATE CQ SelectDeleteOperations
    INSERT INTO DeleteOpsStream
    SELECT META(MongoDBStream,"DatabaseName"),
      META(MongoDBStream,"CollectionName"),
      META(MongoDBStream,"DocumentKey")
    FROM MongoDBStream
    WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";
    
    CREATE TARGET WriteIgnoredDeleteOps USING FileWriter (
      filename:'DeleteOperations.json'
    )
    FORMAT USING JSONFormatter()
    INPUT FROM DeleteOpsStream;
     
    END APPLICATION Mongo2CosmosIncrementalFixedContainer;
  5. In MongoDB, modify the employees collection as follows to add employee4:

    use mydb;
    db.employee.save({_id:4,"name":"employee4","company":"Striim","city":"Palo Alto"});
    db.employee.save({_id:1,"name":"employee1","company":"Striim","city":"Seattle"});
    db.employee.update({_id:2},{$set : {"city":"Palo Alto"}});
    db.employee.remove({_id:3});

    Within 30 seconds, those changes should be replicated to the corresponding Cosmos DB collection with results similar to the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": “Seattle”,
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": ""0800b33d-0000-0000-0000-5bb5aafa0000"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": “Palo Alto”,
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": ""2b00f87b-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 4,
        "name": "employee4”,
        "company": "striim",
        "city": “Palo Alto”,
        "id": “4.0”,
        "_rid": "HnpSALVXpu4BAAAAAAAAAE==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAE==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
When the target collection is in an unlimited container

When a Cosmos DB collection is in an unlimited container, it must have a partition key, which must be specified when you create the collection.

  • When MongoDB save operations create new documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can write to the correct partition.

  • When MongoDB save operations update existing documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can use the partition key and document ID to update the correct target document.

  • MongoDB update operations do not include all fields, so the partition key may be missing from MongoDBReader's output. In those cases, the PartialRecordPolicy open processor retrieves the missing fields from MongoDB and adds them before passing the data to CosmosDBWriter.

  • MongoDB remove operations include only the document ID, so the partition key is missing from MongoDBReader's output. Since CosmosDBWriter would be unable to determine the correct partition, the application writes the database name, collection name, and document key to a DeleteOps collection in CosmosDB.

incrementalMongo2Cosmos.png
CREATE APPLICATION Mongo2CosmosIncrementalUnlimitedContainer; 
 
CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'<MongoDB connection string>',
  authType: 'NoAuth',
  Mode:'Incremental',
  startTimestamp: '<timestamp>',
  Collections:'mydb.$'
 )
OUTPUT TO MongoDBStream;

CREATE STREAM FilteredMongoDBStream OF Global.JsonNodeEvent;

CREATE CQ ExcludeDeleteOperations
INSERT INTO FilteredMongoDBStream
SELECT META(MongoDBStream,"DatabaseName"),
  META(MongoDBStream,"CollectionName"),
  META(MongoDBStream,"DocumentKey")
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() != "DELETE";

CREATE STREAM FullDocstream OF Global.JsonNodeEvent;

CREATE OPEN PROCESSOR CompletePartialDocs USING MongoPartialRecordPolicy ( 
  ConnectionURL:'<MongoDB connection string>', 
  authType:'NoAuth',
  OnMissingDocument: 'Process'
)
INSERT INTO FullDocstream
FROM FilteredMongoDBStream;

CREATE TARGET WriteToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.$,mydb.%',
  IgnorableExceptionCode:'PARTITION_KEY_NOT_FOUND'
)
INPUT FROM FullDocstream;

CREATE CQ SelectDeleteOperations
INSERT INTO DeleteOpsStream
SELECT TO_STRING(META(MongoDBStream,"DatabaseName")) AS DatabaseName,
  TO_STRING(META(MongoDBStream,"CollectionName")) AS CollectionName,
  TO_STRING(META(MongoDBStream,"DocumentKey")) AS DocumentKey
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";

CREATE TARGET WriteDeleteOpsToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.DeleteOps'
)
INPUT FROM DeleteOpsStream;
 
END APPLICATION Mongo2CosmosIncrementalUnlimitedContainer;