Home > other >  How to atomically modify a value within a mutable array?
How to atomically modify a value within a mutable array?

Time:10-12

I have this mutable array in an ST monad. And I have this loop function.

 runST $ do
    myarray <- newMArray (Sz 10) 0
    loopM_ 0 (<10) ( 1) (\j ->
      loopM_ 0 (<10) ( 1) (\i ->
        when (mytruthcheck j i)
             (modifyM_ myarray (pure . ( 1)) ((funcofji j i) :: Int)
     )))

I want to use forkST_ to run the outer loop in parallel like this.

runST $ do
   myarray <- newMArray (Sz 10) 0
   loopM_ 0 (<10) ( 1) (\j ->
     void (forkST_ (loopM_ 0 (<10) ( 1) (\i ->
       when (mytruthcheck j i)
            (Data.Massiv.Array.Mutable.modifyM_
                 myarray (pure . ( 1)) ((funcofji j i) :: Int)
    ))))

But I’m guessing this will cause thread collisions but I don’t really know although I do know it is possible that funcofji can output the same value for different values of j and therefore the loop can modify the same index of myarray for different j s. Is there a way to ensure this is done atomically or is that already the case?

Btw here’s the loopM_ function

loopM_ :: Monad m => Int -> (Int -> Bool) -> (Int -> Int) -> (Int -> m a) -> m ()
loopM_ !init' condition increment f = go init'
  where
    go !step
      | condition step = f step >> go (increment step)
      | otherwise = pure ()

CodePudding user response:

As it was mentioned in the comments, atomic modification is only useful for concurrency which doesn't look like what is needed here. What you need is parallelism., which is available in massiv for Ints: atomicAddIntArray

There is also a builtin way to do parallelism in massiv very efficiently, so there is definitely no need to reinvent the wheel:

  createArray_ Par (Sz 10) $ \scheduler myarray ->
    loopM_ 0 (<10) ( 1) $ \j ->
      loopM_ 0 (<10) ( 1) $ \i ->
        when (mytruthcheck j i) $ 
          scheduleWork_ scheduler $ 
            void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1

Also don't lie to yourself, ST (state thread) was not build for multi-threading, use IO instead. However, if you can guarantee, that despite multi-threaded setup, the outcome that is produced is still deterministic in the end, then it is ok to use unsafePerformIO.

Edit

I just noticed this comment:

The loop size of j is close to 100,000 and the loop size of i is close to 1 Billion.

Which makes me believe that it will be better to parallelize it in this way instead:

  createArray_ Par (Sz 10) $ \scheduler myarray ->
    iforSchedulerM_ scheduler (0 ..: (100000 :. 1000000000)) $ \_ (j :. i) ->
      when (mytruthcheck j i) $ 
        void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1

This will ensure that you only schedule a few jobs, instead of billions. Check out iforSchedulerM_ implementation in order to customize it parallelization if you have more insight into your per i and j workload.

CodePudding user response:

As a result of the discussion in the comments, I wrote a prototype of how this could be, maybe it will be useful (I didn't try to compile so there are maybe some type/syntax errors).

runST $ do
    let arrsz = 10 :: Int -- depends on codomain of funcofji
    let ncaps = 8 :: Int64 -- see also getNumCapabilities
    let outerLoopSize = 10^5  :: Int64
    let innerLoopSize = 10^12 :: Int64
    let chunksz = outerLoopSize `div` ncaps
    
    sync <- newEmptyMVar
    forM_ [0 .. ncaps - 1] $ \k -> forkST_ $ do
        localArr <- newMArray (Sz arrsz) 0
        forM_ [k * chunksz .. min outerLoopSize ((k   1) * chunksz) - 1] $ \j -> do
            forM_ [0 .. innerLoopSize - 1] $ \i -> do
                when (mytruthcheck j i) $
                    modifyM_ localArr (pure . ( 1)) $ funcofji j i
        putMVar sync localArr

    resultArr <- takeMVar sync
    replicateM_ (ncaps - 1) $ do
        localArr <- takeMVar sync
        forM_ [0 .. arrsz - 1] $ do \ix ->
            elm <- readM localArr ix
            modifyM_ resultArr (pure . ( elm)) ix

    ...
  • Related