I have an app that needs to ingest a lot of data (roughly 25gb).

The data used to be small during testing and was all loaded in ram but now I have to change this into a stream.

It is segmented into blocks of a few mb each and the blocks are loaded through an async function that can pull blocks from the disk or a db depending on a few factors.

One approach would be to serve blocks like this:

let blockSequence =
    seq {
        for id in blockIds do
            let data = loadDataAsync id |> Async.runSynchronously
            yield Some data
        yield None

Since the data has sometimes to be pulled out of a db, it can be quite slow and I would like to add some 'prefetch' where I'd get the block data ahead of consumption time.

One idea I had was to build a list of data loaders:

let loaders =
    |> List.map (fun id -> async { loadDataAsync id })

but then I'd be dealing with async types and my return type.

The other idea was to wrap this in a lazy block:

let loaders =
    |> List.map (fun id -> lazy (loadDataAsync id))

so I have a uniform type but I would need to 'poke' the elements ahead of them being pulled and overall that'll be messy.

Then I'm thinking about a queue where I could always keep x elements loaded ahead of their consumption but handle the loading on another thread. This could work by having a thread that checks how many elements are present in the queue and if it drops below a threshold, load the next and enqueue it.

Something like:

let queue = ConcurrentQueue<DataType>()
let wait  = EventWaitHandle (false)
some thread ->
    for id in blockIds do
        if queue.Length < x then
            queue.Add (load...)

main thread ->

    let data = queue.TryDequeue....

But I can't be the first one having a need for this, so has anyone got a good solution to propose?


I came up with a solution, but I'm stuck on the consuming part.

// trades buffer
let private prefetch = 10
let private tradesBuffer = BlockingQueueAgent<TradeData [] option>(prefetch)

// producing the data
let thread = Thread(ThreadStart(fun _ ->
            async {
                for b in timeBlocks do
                    let! data = loadFromCacheAsync (makeFilename instrument interval b)
                    do! tradesBuffer.AsyncAdd (Some data)
                do! tradesBuffer.AsyncAdd (None)
            |> Async.RunSynchronously

// consuming it
seq {
    let rec pullData () =
        match tradesBuffer.Get() with
        | Some data ->
            yield Some data
            pullData ()
        | None ->
            yield None
    pullData ()                

how can I make the pulling look like a sequence?

this could work:

    seq {
        let mutable keepDoingIt = true
        while keepDoingIt do
            let data = tradesBuffer.Get ()
            yield data
            if data.IsNone then keepDoingIt <- false

but I'm trying to avoid the mutable (mostly as an exercise because it's good enough otherwise)

You can use recursion to avoid the while loop:

let rec loop () =
    seq {
        let data = tradesBuffer.Get ()
        yield data
        if data.IsSome then
            yield! loop()
