I'm experimenting with Rocket, Rust and SQLx and I'd like to test what happens when two parallel transactions try to insert a duplicated record on my table.
My insert fn contains nothing special and it works fine:
async fn insert_credentials<'ex, EX>(&self, executor: EX, credentials: &Credentials) -> Result<u64, Errors>
where
EX: 'ex Executor<'ex, Database = Postgres>,
{
sqlx::query!(
r#"INSERT INTO credentials (username, password)
VALUES ($1, crypt($2, gen_salt('bf')))"#,
credentials.username,
credentials.password,
)
.execute(executor)
.await
.map(|result| result.rows_affected())
.map_err(|err| err.into())
}
My test, though, hangs indefinitely since it waits for a commit that never happens:
#[async_std::test]
async fn it_should_reject_duplicated_username_in_parallel() {
let repo = new_repo();
let db: Pool<Postgres> = connect().await;
let credentials = new_random_credentials();
println!("TX1 begins");
let mut tx1 = db.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx1, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("TX2 begins");
let mut tx2 = db.begin().await.unwrap();
println!("It hangs on the next line");
let rows_affected = repo.insert_credentials(&mut tx2, &credentials).await.unwrap();
assert_eq!(rows_affected, 1);
println!("It never reaches this line");
tx1.commit().await.unwrap();
tx2.commit().await.unwrap();
}
How do I create and execute those TXs in parallel, such that the assertions pass but the test fails when trying to commit the second TX?
For reference, this is my Cargo.toml
[package]
name = "auth"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.52"
serde = "1.0.136"
thiserror = "1.0.30"
# TODO https://github.com/SergioBenitez/Rocket/issues/1893#issuecomment-1002393878
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
[dependencies.redis]
version = "0.21.5"
features = ["tokio-comp"]
[dependencies.sqlx]
version = "0.5.11"
features = ["macros", "runtime-tokio-rustls", "postgres"]
[dependencies.uuid]
version = "1.0.0-alpha.1"
features = ["v4", "fast-rng", "macro-diagnostics"]
## DEV ##
[dev-dependencies]
mockall = "0.11.0"
[dev-dependencies.async-std]
version = "1.11.0"
features = ["attributes", "tokio1"]
CodePudding user response:
You can use a async_std::future::timeout
or tokio::time::timeout
. Example using async_std:
use async_std::future;
use std::time::Duration;
let max_duration = Duration::from_millis(100);
assert!(timeout(max_duration, tx2.commit()).await.is_err());
If you want to continue to tx2
before completing tx1
, you can async_std::task::spawn
or tokio::spawn
the tx1 first:
async_std::task::spawn(async move {
assert!(tx1.commit().await.is_ok());
});
CodePudding user response:
@Mika pointed me the right direction, I could spawn both transactions and add a bit of timeout to give the concurrent TXs some time to execute.
let handle1 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db1.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials1).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let handle2 = tokio::spawn(async move {
let repo = new_repo();
let mut tx = db2.begin().await.unwrap();
let rows_affected = repo.insert_credentials(&mut tx, &credentials2).await.unwrap();
assert_eq!(rows_affected, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
tx.commit().await.unwrap()
});
let (_first, _second) = rocket::tokio::try_join!(handle1, handle2).unwrap();
I thought this way both TXs would execute in parallel until the sleep line, then one would commit and the other one would fail on the commit line. But no, actually both TXs execute in parallel, TX1 runs until the sleep and TX2 blocks on the insert line until TX1 commits, then TX2 fails on the insert line.
I guess that's just how DB works on this case and maybe I could change that by messing with TX isolation, but that's not my intent here. I'm just playing to learn more, and that's enough learning for today :)