Tool for running a high throughput data ingestion/transformation workload with MongoDB

Overview

Mongo Mangler

The mongo-mangler tool is a lightweight Python utility, which you can run from a low-powered machine to execute a high throughput data ingestion or transformation workload against a remote MongoDB database (whether self-managed or in Atlas). The utility can perform one, or a combination, of the following actions:

  • Data Expansion. Inflate the contents of an existing collection to a new larger collection by duplicating its documents. For example, expand a data set of 1 thousand documents to one with 1 billion documents ready to be used for testing workloads at scale.
  • Data Faking. Generate a large set of documents from scratch, populating the fields of each document with randomly generated values according to a set of rules. For example, create a massive collection of documents representing fictitious customers with randomly generated personal details, ready to be used in a performance benchmark test.
  • Dask Masking. Transform the contents of a set of documents into a new collection of similar documents but with some fields obfuscated. For example, mask every customer record's surname and birth date with the original values partly redacted and randomly adjusted, respectively, ready for the data set to be distributed to a 3rd party.

The mongo-mangler tool allows you to optionally provide a custom MongoDB aggregation pipeline. In the pipeline, you can define whatever data transformation logic you want. This project also provides a convenient library of functions representing common data faking and masking tasks which you can easily re-use from your custom pipeline.

Performance

The mongo-mangler tool is designed to generate/process a large amount of data in a short space of time. The following table illustrates some of the performance levels that can be achieved:

 

Number of Docs Test Type Exec Time (s) Exec Time (m) Average Processing Rate
100 million Inflate from 1 to 100m by duplication 299 secs ~ 5 mins 335k docs/sec
100 million Inflate from 1 to 100m by generating fakes 565 secs ~ 9.5 mins 177k docs/sec
100 million Transform 100m to 100m by masking 664 secs ~ 11 mins 150k docs/sec
1 billion Inflate from 1 to 1b by duplication 3022 secs ~ 50 mins 331k docs/sec

 

The test environment used to produce the outlined results consisted of:

  • MongoDB version: 5.0
  • MongoDB deployment topology: A single unsharded replica set consisting of 3 replicas
  • Collection and ingestion specifics: 0.45 kb average-sized documents, no secondary indexes defined, write concern of 1 configured for the ingestion (merging) workload
  • Host machine specification per replica: Linux VM, Intel Xeon processor, 16 cores, 64GB RAM, 3000 storage IOPS (non-provisioned), 500GB storage volume (i.e. an Atlas M60 tier in AWS)
  • Client workstation specification: Just a regular laptop with low-bandwidth connectivity (essentially it will just be idling throughout the test run, mostly blocking to wait for responses to the aggregations it's issued against the database).

How High Performance Is Achieved

The mongo-mangler tool uses several tactics to maximise the rate of documents created in the target collection:

  • Aggregation Merge. Issues aggregation pipelines against the target database deployment, using a $merge stage at the end of a pipeline to stream the output directly to another collection in the same database deployment. This means that the compute-intensive processing work is pushed to the database cluster's hardware instead of passing data back and forth from the client machine and using the client machine's hardware resources.
  • Parallel Processing. Divides up the task of copying and transforming input records into multiple batches, each executing an aggregation pipeline in a sub-process against a subset of data, in parallel.
  • Temporary Intermediate Collections. When copying data from a small collection (e.g, of a thousand records) to a new larger collection (e.g. to a billion records), uses temporary intermediate collections to step up the data size (e.g. uses temporary collections for one hundred thousand records and for ten million records).
  • Shard Key Pre-splitting. When running against a sharded cluster, first pre-splits the new target collection to contain an evenly balanced set of empty chunks. The tool supports both hash-based and range-based sharding. For range-based sharding, the tool first analyses the shape of data in the original source collection to determine the split-points and uses this information to configure an empty target collection (even for a compound shard key).

Customisability Library For Faking And Masking Data

The mongo-mangler tool also provides a set of library functions to assist you in quickly generating fake data or masking existing data. These functions produce boilerplate compound aggregation expressions code, which you can reference from your $set/$addFields/$project stages in your custom pipeline.

Faking Library

The fake_accounts example pipeline provided in this project shows an example of how to generate fictitious bank account records using the supplied faker library. Below is the list of faking functions the library provides for use in your custom pipelines, with descriptions for each:

// Generate a random date between now and a maximum number of milliseconds from now
fakeDateAfterNow(maxMillisFromNow)

// Generate a random date between a maximum number of milliseconds before now and now
fakeDateBeforeNow(maxMillisBeforeNow)

// Generate a whole number up to a maximum number of digits (any more than 15 are ignored)
fakeNumber(numberOfDigits)

// Generate a while number between a given minimum and maximum number (inclusive)
fakeNumberBounded(minNumber, maxNumber)

// Generate a text representation of whole number a specific number of digits (characters) in length
fakePaddedNumberAsText(numberOfDigits)

// Generate a decimal number between 0.0 and 1.0 with up to 16 decimal places
fakeDecimal()

// Generate a decimal number with up to a specified number of significant places (e.g. '3' places -> 736.274473638742)
fakeDecimalSignificantPlaces(maxSignificantPlaces)

// Generate a True or False value randomly
fakeBoolean()

// Generate a True or False value randomly but where True is likely for a specified percentage of invocations (e.g. 40 -> 40% likely to be True)
fakeBooleanWeighted(targetAvgPercentTrue)

// Randomly return one value from a provided list
fakeValueFromList(listOfValues)

// Randomly return one value from a provided list but where values later in the list are more likely to be returned on average
fakeValueFromListWeighted(listOfValues)

// Generate an array of sub-documents with the specified size, where each item is randomly taken from the input list
fakeListOfSubDocs(numSumDocs, listOfValues)

// Generate string composed of the same character repeated the specified number of times 
fakeNChars(char, amount)

// Generate a typical first name from an internal pre-defined list of common first names
fakeFirstName()

// FAKE DATA:Generate a typical last name from an internal pre-defined list of common last names
fakeLastName()

// Generate a typical street name from an internal pre-defined list of common street names
fakeStreetName()

// Generate a typical town name from an internal pre-defined list of common town names
fakeTownName()

// Randomly return the name of one of the countries in the world
fakeCountryName()

// Generate a random US-style zipcode/postcode (e.g. 10144)
fakeZipCode()

Masking Library

The mask_accounts example pipeline provided in this project shows an example of how to transform the fictitious bank account records using the supplied mask library. Below is the list of masking functions the library provides for use in your custom pipelines, with descriptions for each:

// Replace the first specified number of characters in a field's value with 'x's
maskReplaceFirstPart(strOrNum, amount)

// Replace the last specified number of characters in a field's value with 'x's
maskReplaceLastPart(strOrNum, amount)

// Replace all the characters in a field's value with 'x's
maskReplaceAll(strOrNum)

// Change the value of a decimal number by adding or taking away a random amount up to a maximum percentage of its current value (e.g. change current value by + or - 10%)
maskAlterDecimal(currentValue, percent)

// Change the value of a whole number by adding or taking away a random amount up to a maximum percentage of its current value, rounded (e.g. change current value by + or - 10%)
maskAlterNumber(currentValue, percent)

// Change the value of a datetime by adding or taking away a random amount up to a maximum percentage of its current value (e.g. change current value by + or - 10%)
maskAlterDate(currentValue, maxChangeMillis)

// Return the same boolean value for a given percentage of time (e.g 40%), and for the rest of the time return the opposite value
maskAlterBoolean(currentValue, percentSameValue)

// Return the same value for a given percentage of time (e.g 40%), and for the rest of the time return a random value from the given list
maskAlterValueFromList(currentValue, percentSameValue, otherValuesList)

// Change on average a given percentage of the list members values to a random value from the provided alternative list
maskAlterListFromList(currentList, percentSameValues, otherValuesList)

Note, for data masking, even though the pipeline is irreversibly obfuscating fields, this doesn't mean that the masked data is useless for performing analytics to gain insight. A pipeline can mask most fields by fluctuating the original values by a small but limited random percentage (e.g. vary a credit card's expiry date or transaction amount by +/- 10%), rather than replacing them with completely random new values. In such cases, if the input data set is sufficiently large, then minor variances will be equaled out. For the fields that are only varied slightly, analysts can derive similar trends and patterns from analysing the masked data as they would the original data. See the Mask Sensitive Fields chapter of the Practical MongoDB Aggregations book for more information.

How To Run

Prerequisites

Ensure you have a running MongoDB cluster (self-managed or running in Atlas) which is network accessible from your client workstation.

Ensure you are connecting to the MongoDB cluster with a database user which has read privileges for the source database and read + write privileges the target database. If you are running a Sharded cluster, the database user must also have the privileges to run the 'enablingSharding' and 'splitChunk' commands. If this Sharded cluster is on Atlas, you would typically need to assign the 'Atlas Admin' role to the database user.

On your client workstation, ensure you have Python 3 (version 3.8 or greater) and the MongoDB Python Driver (PyMongo) installed. Example to install PyMongo:

pip3 install --user pymongo

Ensure the mongo-mangler.py file is executable on your host workstation.

Run With No Parameters To View Full Help Options

In a terminal, execute the following to view the tool's help instructions and the full list of options you can invoke it with:

./mongo-mangler.py -h

Inflate Existing Collection Of Data To A Far Larger Collection

Ensure you have a database with a collection, ideally containing many sample documents with similar fields but varying values. This will enable a newly expanded collection to reflect the shape and variance of the source collection, albeit with duplicated records. As a handy example, if you are using Atlas, you can quickly load the Atlas sample data set) via the Atlas Console, which contains movies data.

From the root folder of this project, execute the following to connect to a remote MongoDB cluster to copy and expand the data from an existing collection, sample_mflix.movies, to an a new collection, testdb.big_collection, which will contain 10 million documents:

./mongo-mangler.py -m "mongodb+srv://myusr:[email protected]/" -d 'sample_mflix' -c 'movies' -o 'testdb' -t 'big_collection' -s 10000000

NOTE 1: Before running the above command, first change the URL's username, password, and hostname, to match the URL of your running MongoDB cluster, and if not using the Atlas sample data set, change the values for the source database and collection names.

NOTE 2: If executing the command against a sharded cluster, by default a hash-based shard key will be configured based on the _id field. To use a range-based shard key, provide the --shardkey parameter in the caommand line, providing the name of the field to use as the shard key (or to specify a compound key, provide a string of comma separated field names, with no spaces between the field names).

Generate A New Large Collection From Scratch With Fake Data

No input collection is required, although one can be used, to provide some hard-coded document structure for every document generated.

To use the example faking aggregation pipeline provided in this project for generating random customer records data, execute the following to connect to a remote MongoDB cluster to generate a new collection, testdb.big_collection, which will contain 10 million documents of fake data:

./mongo-mangler.py -m "mongodb+srv://myusr:[email protected]/" -o 'testdb' -t 'big_collection' -s 10000000 -p 'examples/pipeline_example_fake_accounts.js'

NOTE 1: Before running the above command, first change the URL's username, password, and hostname, to match the URL of your running MongoDB cluster.

NOTE 2: You can of course construct your own pipeline containing whatever aggregation stages and operators you want and using whichever of the supplied faking library functions you require - in the above command change the name of the pipeline to reference the pipeline you've created.

Transform An Existing Collection To A Collection Of Same Size With Obfuscated Values

Ensure you have a database with a collection containing the set of existing documents to be transformed. The example provided here will use the existing fake collection created in the previous step, but in a real situation, you would declare the source collection to one that contains real-world sensitive data.

To use the example masking aggregation pipeline provided in this project for masking values in an existing customer records collection, execute the following to connect to a remote MongoDB cluster to generate a new collection, testdb.big_collection, which will contain 10 million documents of fake data:

./mongo-mangler.py -m "mongodb+srv://myusr:[email protected]/" -d 'testdb' -c 'big_collection' -t 'masked_big_collection' -s 10000000 -p 'examples/pipeline_example_mask_accounts.js'

NOTE 1: Before running the above command, first change the URL's username, password, and hostname, to match the URL of your running MongoDB cluster, and if using a different source collection of real data, change the values for the source database and collection names.

NOTE 2: You can of course construct your own pipeline containing whatever aggregation stages and operators you want and using whichever of the supplied masking library functions you require - in the above command change the name of the pipeline to reference the pipeline you've created.

Prototyping Your Custom Faked/Masked Aggregation Pipelines

Developing The Pipeline Interactively With The MongoDB Shell

The examples sub-folder contains example pipelines for faking and masking customer data. When you modify one of these pipelines or create a new pipeline, there is a handy way to test your pipeline changes before trying to use the pipeline with mongo-mangler.py for mass data processing.

You define the pipeline in JavaScript even though mongo-mangler.py is actually written in Python. This makes it easy to first prototype your aggregation pipeline code using the MongoDB Shell mongosh. For example to prototype a new pipeline you might execute the following from a terminal in the project's root folder to start an interactive MongoDB Shell session and construct and then run a custom MongoDB Aggregation pipeline which uses this project's faking library:

mongosh "mongodb://localhost:27017"
load("lib/masksFakesGeneraters.js")  // Load the faking/masking library
use test
db.dummycollection.insertOne({})  // Create a dummy doc in a collection with just an '_id' field

pipeline = [
    // A pipeline which will randomly generate surname and date of birth fields
    {"$set": {
        "lastname": fakeLastName(), 
        "dateOfBirth": fakeDateBeforeNow(100*365*24*60*60*1000),  // Up to 100 years ago
    }},
]

db.dummycollection.aggregate(pipeline)

Note, if you have saved to a file your pipeline variable containing the aggregation pipeline code (e.g. saved to to my_test_pipeline.js in the project's root folder), you can load the file's pipeline into the interactive shell with a command similar to: load("my_test_pipeline.js") before running aggregate(pipeline).

Testing Your Custom Pipeline With A MongoDB Shell Script

Once you've finished prototyping a pipeline, you need to save it to a file with the pipeline code encapsulated in a array variable called pipeline, for example:

pipeline = [
    // paste your pipeline JavaScript code here
]

IMPORTANT: When defining the pipeline variable in the file, do not include any JavaScript variable qualifier such as let, var or const, because the pipeline code will be converted to Python on the fly by mongo-mangler.py tool. The tool has only limited JavaScript-to-Python conversion capabilities.

The examples sub-folder contains two sample pipelines (one for faking and one for masking) and also contains a test MongoDB Shell script and a test Python script, for you to test the example pipelines or your custom pipeline, before you use your pipeline file when running mongo-mangler.py.

To test the pipeline file when run as a part of a script with the MongoDB Shell, from a terminal, change directory to the examples sub-folder and executed the provided test script test-agg-mongosh.js via mongosh. For example, to test the fake example pipeline against a locally running MongoDB database, execute:

mongosh --quiet 'mongodb://localhost:27017' test-agg-mongosh.js

You can change the following constants in the test-agg-mongosh.js file to match your envuronment and specific pipeline file: DO_FAKE_RATHER_THAN_MASK, DB, COLL, LIB_FILE, FAKE_AGG_FILE, MASK_AGG_FILE.

Testing Your Custom Pipeline With A Python Script

It is recommended to also test the same JavaScript pipeline file with the test Python script too, to ensure the JavaScript pipeline code has been translated correctly to Python on the fly. For example, to test the fake example pipeline with the test Python script, from the same examples sub-folder, execute:

./test-agg-python.py

The Python script test-agg-python.py also contains similar constants which you can change to match your environment and specific pipeline file.

Owner
Paul Done
Paul Done
Replite - An embeddable REPL powered by JupyterLite

replite An embeddable REPL, powered by JupyterLite. Usage To embed the code cons

Jeremy Tuloup 47 Nov 09, 2022
An example project that shows how to check if a certain macro is active in a file.

PlatformIO Check Compiler Flags Example Description Demonstrates the usage of an extra script and a special compilter invocation to get the active mac

Maximilian Gerhardt 1 Oct 28, 2021
🟥This is an overview of how to set up and use DataStore3 in your Roblox experiences

Welcome to DataStore3 👋 This is an overview of how to set up and use DataStore3 in your Roblox experiences What is it? 🤔 DataStore3 is a service tha

Reece Harris 7 Aug 19, 2022
tool to automate exploitation of android degubg bridge vulnerability

DISCLAIMER DISCLAIMER: ANY MALICIOUS USE OF THE CONTENTS FROM THIS ARTICLE WILL NOT HOLD THE AUTHOR RESPONSIBLE HE CONTENTS ARE SOLELY FOR EDUCATIONAL

6 Feb 12, 2022
ROS Foxy + Raspi + Adafruit BNO055

ROS Foxy + Raspi + Adafruit BNO055

Ar-Ray 3 Nov 04, 2022
Lookup for interesting stuff in SMB shares

SMBSR - what is that? Well, SMBSR is a python script which given a CIDR/IP/IP_file/HOSTNAME(s) enumerates all the SMB services listening (445) among t

Vincenzo 112 Dec 15, 2022
Two predictive attributes (Speed and Angle) and one attribute target (Power)

Two predictive attributes (Speed and Angle) and one attribute target (Power). A container crane has the function of transporting containers from one point to another point. The difficulty of this tas

Astitva Veer Garg 1 Jan 11, 2022
Imports an object based on a string import_string('package.module:function_name')() - Based on werkzeug.utils

DEPRECATED don't use it. Please do: import importlib foopath = 'src.apis.foo.Foo' module_name = '.'.join(foopath.split('.')[:-1]) # to get src.apis.f

Bruno Rocha Archived Projects 11 Nov 12, 2022
Slimbook Battery 4 is the new version with new features that improves battery control and increases battery duration in laptops.

Slimbookbattery Slimbook Battery 4 is the new version with new features that improves battery control and increases battery duration in laptops. This

SLIMBOOK TEAM 128 Dec 28, 2022
Async Python Circuit Breaker implementation

aiocircuitbreaker This is an async Python implementation of the circuitbreaker library. Installation The project is available on PyPI. Simply run: $ p

5 Sep 05, 2022
You can change your mac address with this program.

1 - Warning! You can use this program with Kali Linux. Therefore if you don't install the Kali Linux. Firstly you need to install Kali Linux. 2 - Star

Mustafa Bahadır Doğrusöz 1 Jun 10, 2022
This project is about for notifying moderators about uploaded photos on server.

This project is about for notifying moderators (people who moderate data from photos) about uploaded photos on server.

1 Nov 24, 2021
A simple 3D rigid body simulation written in python

pyRigidBody3d A simple 3D rigid body simulation written in python

30 Oct 07, 2022
Moleey Panel with python 3

Painel-Moleey pkg upgrade && pkg update pkg install python3 pip install pyfiglet pip install colored pip install requests pip install phonenumbers pkg

Moleey. 1 Oct 17, 2021
DG - A(n) (unusual) programming language

DG - A(n) (unusual) programming language General structure There are no infix-operators (i.e. 1 + 1) Each operator takes 2 parameters When there are m

1 Mar 05, 2022
Fly DCS without a joystick

Intro Usage Delete all mouse view axis Install DCSEasyControlExports to your "Saved Games/DCS/" Path python DCSEasyControl/main.py Set DCS to F12 view

XuHao 36 Dec 27, 2022
A browser login credentials thief for windows and Linux

Thief 🦹🏻 A browser login credentials thief for windows and Linux Python script to decrypt login credentials from browsers in windows or linux Decryp

Ash 1 Dec 13, 2021
Plugin to manage site, circuit and device diagrams and documents in Netbox

Netbox Documents Plugin A plugin designed to faciliate the storage of site, circuit and device specific documents within NetBox Note: Netbox v3.2+ is

Jason Yates 38 Dec 24, 2022
Data wrangling & common calculations for results from qMem measurement software

qMem Datawrangler This script processes output of qMem measurement software into an Origin ® compatible *.csv files and matplotlib graphs to quickly v

Julian 1 Nov 30, 2021
Service for working with open data of the State Duma of the Russian Federation

Сервис для работы с открытыми данными Госдумы РФ Исходные данные из API Госдумы РФ извлекаются с помощью Apache Nifi и приземляются в хранилище Clickh

Aleksandr Sergeenko 2 Feb 14, 2022