Home > Software engineering >  Efficiently upsert thousands of records to mongodb
Efficiently upsert thousands of records to mongodb

Time:05-08

We have a database containing around ~20 Million records with an indexed field order_id.

Every day, after each hour, we receive an incremental update, starting from ~2K and growing to ~50K records which some might be new, and others are updates to previous records.

To process these data, I have created a pipeline using Airflow. And before pushing the data to MongoDB, The data is available as a Pandas dataframe as I'm using it to process and clean the data.

For now, I'm using this code to upsert the data. But I'm not sure if it's the correct or efficient solution or not.

from pymongo import ReplaceOne

col = MongoDB.get_collection("collection", mongo_db="database")

# My processed data
df

df.reset_index()

bulk_data = [
    ReplaceOne({"order_id": row["order_id"]}, dict(row), upsert=True)
    for index, row in df.iterrows()
]

col.bulk_write(bulk_data)

So what might be the other options? Is this way of doing the task logical or I'm doing it wrong?

CodePudding user response:

You have a mostly efficient technique; your query field is indexed and you are using bulk operations. I would be surprised if this is running slowly, even on 50k records.

If you want to squeeze the last drop of performance out, this approach may be quicker. It deletes all the incoming records and re-inserts them; generally this is more performant than using ReplaceOne() with upsert=True. Also the to_dict() method in pandas removes a step compared to using iterrows(). Finally you can turn the bulk ordered option to False which again is a perfomance gain, as you presumably don't care abiut the order of the inserts.

col.delete_many({"order_id": {'$in': list(df["order_id"])}})
bulk_data = [InsertOne(row) for row in df.to_dict(orient='records')]   
col.bulk_write(bulk_data, ordered=False)
  • Related