Home > Mobile >  Streaming pipeline using result in next step
Streaming pipeline using result in next step

Time:02-11

I'm using the streaming package. I want to use result of one step defined by the S.store as a parameter to a following step in the pipeline by preserving the constant memory. The myStream is loaded and parsed from a file.

I have a following example that typechecks:

import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M

data A = MkA deriving (Show)

insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined

insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined

myStream :: S.Stream (S.Of A) IO r
myStream = undefined

run :: IO ()
run =
  myStream
    & S.store insertA
    & insertB M.empty
    & print

However, the line & insertB M.empty is taking an empty map but I want to use the map from the previous step, from the insertA function. The insertB function then uses this Map for a lookup.

The solution I can think of is the following:

run :: IO ()
run =
  myStream
    & S.store insertA
    & ( \e -> do
          resultMap <- S.effects e
          insertB resultMap e
      )
    & print

Questions

Does this preserve streaming benefits like running in constant memory? How does it solve this in the background, since the stream needs to be processed as a whole to get the Map? It passes the same stream multiple times - loads it from a file 2 times to preserve the constant memory?

In case this would be the case (loads the file 2 times), what if the source of the stream was not from parsing a file but from some data stream that can be read only once?

Is there any other elegant solution to this problem that also holds the benefits of streaming where the next step in a pipeline needs to use the result of the previous one?

CodePudding user response:

There's a problem with the proposed code here:

  resultMap <- S.effects e
  insertB resultMap e

The problem is that you are "running" the same stream twice, and that is usually problematic for IO-based streams.

For example, imagine myStream reads from a file handle. By the time we invoke insertB for the second pass, effects will have already reached end-of-file! Any further reads from the handle won't return any data.

Of course, we could read the same file twice with two different streams. That preserves streaming, but requires two passes.


It should be noted though that for certain base monads that have built-in resource management, like resourcet, you can run the same Stream value twice, because the stream code is "smart" enough to allocate and deallocate the underlying resources each time the stream is run.

For example, the version of the Stream type present in linear-base supports the function readFile:

readFile :: FilePath -> Stream (Of Text) RIO ()

Which returns a Stream working in a resource-aware IO.

That said, I'm not a fan of hiding such repeated reads of a file in a streaming pipeline, it seems confusing to me.

  • Related