I'm using the streaming
package.
I want to use result of one step defined by the S.store
as a parameter to a following step in the pipeline by preserving the constant memory. The myStream
is loaded and parsed from a file.
I have a following example that typechecks:
import qualified Streaming.Prelude as S
import qualified Data.Map.Strict as M
data A = MkA deriving (Show)
insertA :: MonadIO m => S.Stream (S.Of A) m r -> m (M.Map String Int)
insertA = undefined
insertB :: MonadIO m => M.Map String Int -> S.Stream (S.Of A) m r -> m Int
insertB = undefined
myStream :: S.Stream (S.Of A) IO r
myStream = undefined
run :: IO ()
run =
myStream
& S.store insertA
& insertB M.empty
& print
However, the line & insertB M.empty
is taking an empty map but I want to use the map from the previous step, from the insertA
function.
The insertB
function then uses this Map for a lookup.
The solution I can think of is the following:
run :: IO ()
run =
myStream
& S.store insertA
& ( \e -> do
resultMap <- S.effects e
insertB resultMap e
)
& print
Questions
Does this preserve streaming benefits like running in constant memory?
How does it solve this in the background, since the stream needs to be processed as a whole to get the Map
? It passes the same stream multiple times - loads it from a file 2 times to preserve the constant memory?
In case this would be the case (loads the file 2 times), what if the source of the stream was not from parsing a file but from some data stream that can be read only once?
Is there any other elegant solution to this problem that also holds the benefits of streaming where the next step in a pipeline needs to use the result of the previous one?
CodePudding user response:
There's a problem with the proposed code here:
resultMap <- S.effects e
insertB resultMap e
The problem is that you are "running" the same stream twice, and that is usually problematic for IO
-based streams.
For example, imagine myStream
reads from a file handle. By the time we invoke insertB
for the second pass, effects
will have already reached end-of-file! Any further reads from the handle won't return any data.
Of course, we could read the same file twice with two different streams. That preserves streaming, but requires two passes.
It should be noted though that for certain base monads that have built-in resource management, like resourcet, you can run the same Stream
value twice, because the stream code is "smart" enough to allocate and deallocate the underlying resources each time the stream is run.
For example, the version of the Stream
type present in linear-base supports the function readFile
:
readFile :: FilePath -> Stream (Of Text) RIO ()
Which returns a Stream
working in a resource-aware IO
.
That said, I'm not a fan of hiding such repeated reads of a file in a streaming pipeline, it seems confusing to me.