Home > Software engineering >  implement thread safe function object in rust
implement thread safe function object in rust

Time:05-16

I am trying to implement some stability pattern (introduced in cloud-native-go, e.g. https://github.com/cloud-native-go/examples/blob/main/ch04/circuitbreaker.go) in rust lang. The basic idea is to wrap a function in a closure, which keeps some states about the function call. I also want my implementation to be thread safe.

I finally come up with something like this:

use std::sync::Arc;
use std::sync::Mutex;
use std::thread;

#[derive(PartialEq, Debug, Clone)]
pub(crate) enum Error {
    UnReachable,
    CircuitError,
}

pub(crate) type Circuit = Arc<Mutex<dyn Fn() -> Result<String, Error>>>;

pub(crate) fn fail_after(threshold: usize) -> Circuit {
    let cnt = Arc::new(Mutex::new(0));
    let f = move || {
        let mut c = cnt.lock().unwrap();
        *c  = 1;
        if *c > threshold {
            return Err(Error::CircuitError);
        } else {
            return Ok("ok".to_owned());
        }
    };
    return Arc::new(Mutex::new(f));
}

#[test]
fn test_debounce_first() {
    let c = fail_after(1);
    assert!(c.lock().unwrap()().is_ok());
    assert!(c.lock().unwrap()().is_err());
}


#[test]
fn test_debounce_first_data_race() {
    let c = fail_after(1);
    let mut results = vec![];
    for _ in 0..10 {
        let d1 = Arc::clone(&c);
        let h = thread::spawn(move || {
            let r: Result<String, Error> = d1.lock().unwrap()();
            r
        });
        results.push(h)
    }

    for result in results {
        let r = result.join().unwrap();
        assert!(r.is_ok())
    }
}

(rust playground)

The multi thread test give compile error:

error[E0277]: `(dyn Fn() -> Result<String, Error>   'static)` cannot be sent between threads safely
   --> src/lib.rs:41:17
    |
41  |         let h = thread::spawn(move || {
    |                 ^^^^^^^^^^^^^ `(dyn Fn() -> Result<String, Error>   'static)` cannot be sent between threads safely
    |
    = help: the trait `Send` is not implemented for `(dyn Fn() -> Result<String, Error>   'static)`
    = note: required because of the requirements on the impl of `Sync` for `Mutex<(dyn Fn() -> Result<String, Error>   'static)>`
    = note: required because of the requirements on the impl of `Send` for `Arc<Mutex<(dyn Fn() -> Result<String, Error>   'static)>>`
    = note: required because it appears within the type `[closure@src/lib.rs:41:31: 44:10]`
note: required by a bound in `spawn`

I wonder how can I resolve this? I have already wrapped the function object with Arc and Mutex, why the compiler still complains?

CodePudding user response:

Short answer: change dyn Fn() -> Result<String, Error> to dyn Fn() -> Result<String, Error> Send 'static. The original definition of the type makes no guarantees about how long the dyn Fn lives or whether or not it can be sent between threads safely.

Another note, you're probably going to have to further modify it to Box<dyn Fn() -> Result<String, Error> Send 'static> since I see you're constructing your Mutex with new which requires the type it contains to be Sized, however dyn Trait is unsized. I have never tried constructing a mutex with an unsized type so you may need to look into that further.

Edit

You don't need to box the dyn Fn() -> _ because the Arc<Mutex<_>> is initially constructed with the concrete closure, but then coerces to the unsized Mutex<dyn Fn() -> _> due to the third blanket implementation mentioned here and Arc's implementation of CoerceUnsized.

  • Related