Home > Enterprise >  Combining ResourceT with bracket in a streaming pipeline
Combining ResourceT with bracket in a streaming pipeline

Time:02-12

Here a simplification of my code:

import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming

main :: IO ()
main = do
  res <- runResourceT $ calculateA myLinesStream
  return ()

type MyLinesStream m r = S.Stream (S.Of String) m r

connect :: IO Connection
connect = undefined

close :: Connection -> IO ()
close = undefined

calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
  where
    go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
    go stream conn = stream & S.length_ >>= liftIO . print

myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
  S.each ["1.zip", "2.zip"]
    & S.mapM (\fileName -> C.readFile fileName & gunzip)
    & S.mconcat
    & C.lines
    & mapsM (S.toList . C.unpack)
    & void

There is a type error on the following line on the go stream:

calculateA stream = liftIO (bracket connect close (go stream))

The error says:

Couldn't match type ‘m’ with ‘IO’
  ‘m’ is a rigid type variable bound by
    the type signature for:
      calculateA :: forall (m :: * -> *) r.
                    MonadIO m =>
                    MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
    Actual type: Connection -> m ()

Questions

  1. What to do to make this code typecheck and still make it secure for releasing resources in the calculateA function?
  2. I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?
  3. Is the composition good? (Note that I need the calculateA function separately from the myLinesStream)

CodePudding user response:

The problem is that you are trying to use bracket with a monad that is too general. bracket has signature:

bracket :: IO a -> (a -> IO b) -> (a -> IO c) -> IO c   

It takes IO actions as parameters. However, the go function that you pass to bracket needs to work in a generic base monad m chosen by the caller of calculateA (you later make that monad be ResourceT IO in main).

The bracket from base and ResourceT don't mix very well. Instead, you need to turn to special functions from the resourcet package like allocate and release, and use them to define a helper like:

bracketStream :: (Functor f, MonadResource m) 
              => IO a 
              -> (a -> IO ()) 
              -> (a -> Stream f m b) 
              -> Stream f m b
bracketStream alloc free inside = do
        (key, seed) <- lift (allocate alloc free)
        r <- inside seed
        release key
        pure r

How does it work? If you have a stream of Xs, it prepends an allocation action at the beginning of the stream (registering the corresponding cleanup action to be called in case of abnormal termination, like exceptions) and it also adds an explicit call to the cleanup action when the stream is exhausted:

(allocate register cleanup) X X X ... X (cleanup)

You wrote:

I'm reading multiple files using C.readFile and then wrapping it inside runResourceT. Will this properly release all the file handles?

Yes. With ResourceT, resources are freed either when an explicit cleanup action is performed, or when when we "exit" the ResourceT with runResourceT (perhaps abnormally, with an exception).

So, if we read a stream of Xs followed by a stream of Ys, we would have:

(allocate register cleanup) X X X ... X (cleanup) (allocate register cleanup) Y Y Y ... Y (cleanup)

That is, the resource that produces Xs would be released before allocating the resource that produces Ys.

  • Related