I have an app that needs to ingest a lot of data (roughly 25gb).
The data used to be small during testing and was all loaded in ram but now I have to change this into a stream.
It is segmented into blocks of a few mb each and the blocks are loaded through an async function that can pull blocks from the disk or a db depending on a few factors.
One approach would be to serve blocks like this:
let blockSequence =
seq {
for id in blockIds do
let data = loadDataAsync id |> Async.runSynchronously
yield Some data
yield None
}
Since the data has sometimes to be pulled out of a db, it can be quite slow and I would like to add some 'prefetch' where I'd get the block data ahead of consumption time.
One idea I had was to build a list of data loaders:
let loaders =
blockIds
|> List.map (fun id -> async { loadDataAsync id })
but then I'd be dealing with async types and my return type.
The other idea was to wrap this in a lazy block:
let loaders =
blockIds
|> List.map (fun id -> lazy (loadDataAsync id))
so I have a uniform type but I would need to 'poke' the elements ahead of them being pulled and overall that'll be messy.
Then I'm thinking about a queue where I could always keep x elements loaded ahead of their consumption but handle the loading on another thread. This could work by having a thread that checks how many elements are present in the queue and if it drops below a threshold, load the next and enqueue it.
Something like:
let queue = ConcurrentQueue<DataType>()
let wait = EventWaitHandle (false)
some thread ->
for id in blockIds do
wait.WaitForOne()
if queue.Length < x then
queue.Add (load...)
wait.Reset()
main thread ->
let data = queue.TryDequeue....
wait.Set()
But I can't be the first one having a need for this, so has anyone got a good solution to propose?
Edit:
I came up with a solution, but I'm stuck on the consuming part.
// trades buffer
let private prefetch = 10
let private tradesBuffer = BlockingQueueAgent<TradeData [] option>(prefetch)
// producing the data
let thread = Thread(ThreadStart(fun _ ->
async {
for b in timeBlocks do
let! data = loadFromCacheAsync (makeFilename instrument interval b)
do! tradesBuffer.AsyncAdd (Some data)
do! tradesBuffer.AsyncAdd (None)
}
|> Async.RunSynchronously
)
)
// consuming it
PSEUDO CODE THAT CAN'T WORK
seq {
let rec pullData () =
match tradesBuffer.Get() with
| Some data ->
yield Some data
pullData ()
| None ->
yield None
pullData ()
}
how can I make the pulling look like a sequence?
this could work:
seq {
let mutable keepDoingIt = true
while keepDoingIt do
let data = tradesBuffer.Get ()
yield data
if data.IsNone then keepDoingIt <- false
}
but I'm trying to avoid the mutable (mostly as an exercise because it's good enough otherwise)
CodePudding user response:
You can use recursion to avoid the while
loop:
let rec loop () =
seq {
let data = tradesBuffer.Get ()
yield data
if data.IsSome then
yield! loop()
}