Home > Software design >  try_join to make mongodb transactions sent at the same time
try_join to make mongodb transactions sent at the same time

Time:10-08

I'm new to Rust and I'm using the default MongoDB driver https://docs.rs/mongodb/2.0.0/mongodb/

I remember when coding with Node.js, there was a possibility to send transactions with some Promise.all() in order to execute all transactions at the same time for optimization purposes, and if there are no errors, to make a commit to the transaction. (Node.js example here: https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74)

I'm trying to implement the same logic in Rust now, using try_join! but I'm always opposed to the problem:

error: cannot borrow session as mutable more than once at a time; label: first mutable borrow occurs here

use mongodb::{bson::oid::ObjectId, Client, Database, options};
use async_graphql::{
    validators::{Email, StringMaxLength, StringMinLength},
    Context, ErrorExtensions, Object, Result,
};
use futures::try_join;
//use tokio::try_join; -> same thing

#[derive(Default)]
pub struct UserMutations;

#[Object]
impl UserMutations {


async fn user_followed<'ctx>(
        &self,
        ctx: &Context<'ctx>,
        other_user_id: ObjectId,
        current_user_id: ObjectId,
    ) -> Result<bool> {

    let mut session = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!")
        .start_session(Some(session_options))
        .await?;

    session.start_transaction(Some(options::TransactionOptions::builder()
            .read_concern(Some(options::ReadConcern::majority()))
            .write_concern(Some(
                options::WriteConcern::builder()
                    .w(Some(options::Acknowledgment::Majority))
                    .w_timeout(Some(Duration::new(3, 0)))
                    .journal(Some(false))
                    .build(),
            ))
            .selection_criteria(Some(options::SelectionCriteria::ReadPreference(
                options::ReadPreference::Primary
            )))
            .max_commit_time(Some(Duration::new(3, 0)))
            .build())).await?; 
    
   
    let db = Client::with_uri_str(dotenv!("URI"))
        .await
        .expect("DB not accessible!").database("database").collection::<Document>("collection");

             try_join!(
                db.update_one_with_session(
                    doc! {
                        "_id": other_user_id
                    },
                    doc! {
                        "$inc": { "following_number": -1 }
                    },
                    None,
                    &mut session,
                ),
                db.update_one_with_session(
                    doc! {
                        "_id": current_user_id
                    },
                    doc! {
                        "$inc": { "followers_number": -1 }
                    },
                    None,
                    &mut session,
                )
            )?;
    
    Ok(true)
  }
}

849 | |                     &mut session,
    | |                     ------------ first mutable borrow occurs here
...   |
859 | |                     &mut session,
    | |                     ^^^^^^^^^^^^ second mutable borrow occurs here
860 | |                 )
861 | |             )?;
    | |_____________- first borrow later captured here by closure

Is there any way to send transaction functions sync to not lose any time on independent mutations? Does anyone have any ideas? Thanks in advance!

CodePudding user response:

This limitation is actually by design. In MongoDB, client sessions cannot be used concurrently (see here and here), and so the Rust driver accepts them as &mut to prevent this from happening at compile time. The Node example is only working by chance and is definitely not recommended or supported behavior. If you would like to perform both updates as part of a transaction, you'll have to run one update after the other. If you'd like to run them concurrently, you'll need to execute them without a session or transaction.

As a side note, a client session can only be used with the client that it was created from. In the provided example, the session is being used with a different one, which will cause an error.

CodePudding user response:

I put just an idea :-)

You can reproduce error like this

use futures::{try_join};

async fn update_one(session: &mut String) -> Result<(), &'static str>
{
    *session = String::from("ok change");
    Ok(())
}

#[tokio::main]
async fn main() -> ()
{
    let mut s = String::from("hello");
    try_join!(update_one(&mut s), update_one(&mut s));
}

And you can try to around problème like this

use futures::{try_join};
use std::sync::{Arc, Mutex};

async fn update_one(session: &mut String) -> Result<(), &'static str>
{
    *session = String::from("ok change");
    Ok(())
}

async fn wrap_update_one(s: &Arc<Mutex<String>> ) -> Result<(), &'static str>
{
    let m = Arc::clone(&s);
    let mut a = m.lock().unwrap();
    
    Ok(update_one(&mut a).await?)
}

#[tokio::main]
async fn main() -> ()
{
    let s = Arc::new(Mutex::new(String::from("hello")));
    try_join!(wrap_update_one(&s), wrap_update_one(&s));
    
    let m = Arc::clone(&s);
    let a = m.lock().unwrap();
    println!("{}", a);
    ()
}

But the Mutex lock is like you do without the async

  • Related