I'm using F# and have an AsyncSeq<Async<'t>>
. Each item will take a varying amount of time to process and does I/O that's rate-limited.
I want to run all the operations in parallel and then pass them down the chain as an AsyncSeq<'t>
so I can perform further manipulations on them and ultimately AsyncSeq.fold
them into a final outcome.
The following AsyncSeq
operations almost meet my needs:
mapAsyncParallel
- does the parallelism, but it's unconstrained, (and I don't need the order preserved)iterAsyncParallelThrottled
- parallel and has a max degree of parallelism but doesn't let me return results (and I don't need the order preserved)
What I really need is like a mapAsyncParallelThrottled
. But, to be more precise, really the operation would be entitled mapAsyncParallelThrottledUnordered
.
Things I'm considering:
- use
mapAsyncParallel
but use aSemaphore
within the function to constrain the parallelism myself, which is probably not going to be optimal in terms of concurrency, and due to buffering the results to reorder them. - use
iterAsyncParallelThrottled
and do some ugly folding of the results into an accumulator as they arrive guarded by a lock kinda like this - but I don't need the ordering so it won't be optimal. - build what I need by enumerating the source and emitting results via
AsyncSeqSrc
like this. I'd probably have a set ofAsync.StartAsTask
tasks in flight and start more after eachTask.WaitAny
gives me something toAsyncSeqSrc.put
until I reach themaxDegreeOfParallelism
Surely I'm missing a simple answer and there's a better way?
Failing that, would love someone to sanity check my option 3 in either direction!
I'm open to using AsyncSeq.toAsyncEnum
and then use an IAsyncEnumerable
way of achieving the same outcome if that exists, though ideally without getting into TPL DataFlow or RX land if it can be avoided (I've done extensive SO searching for that without results...).
CodePudding user response:
If I'm understanding your requirements then something like this will work. It effectively combines the iter unordered with a channel to allow a mapping instead.
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<_>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateUnbounded()
let! _ =
async {
do!
source
|> AsyncSeq.iterAsyncParallelThrottled boundedAmount (fun s -> async {
let! orderChild = mapper s
do! channel.Writer.WriteAsync(orderChild, ct)
})
channel.Writer.Complete()
}
|> Async.StartChild
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
Also with a little bit of variation of the above (e.g. child tasks) you can make it ordered and parallelism bounded.
let mapAsyncParallelBounded boundedAmount mapper source = asyncSeq {
let! ct = Async.CancellationToken
let channel = Channel.CreateBounded(BoundedChannelOptions(boundedAmount))
let! _ =
source
|> AsyncSeq.iterAsync (fun s -> async {
let! orderChild = mapper s |> Async.StartChild
do! channel.Writer.WriteAsync(orderChild, ct)
})
|> Async.StartChild
let! ct = Async.CancellationToken
for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
let! toReturn = item
yield toReturn
}
CodePudding user response:
Here's a testbed I used to validate @akara's excellent work:
#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
module AsyncSeqEx =
open System.Threading.Channels
let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let! ct = Async.CancellationToken
let channel : Channel<'u> = Channel.CreateUnbounded()
let handle req = async {
let! res = mapper req
do! let t = channel.Writer.WriteAsync(res, ct) in t.AsTask() |> Async.AwaitTask }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
channel.Writer.Complete() }
yield! channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum
}
I also ported the same code to use AsyncSeqSrc
instead of channels, which seems to work too, with equivalent perf:
// AsyncSeqSrc-based reimpl of the above
let mapAsyncParallelBoundedUnordered2 boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
let output = AsyncSeqSrc.create ()
let handle req = async { let! res = mapper req in AsyncSeqSrc.put res output }
let! _ = Async.StartChild <| async {
do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
AsyncSeqSrc.close output }
yield! AsyncSeqSrc.toAsyncSeq output
}
The following impl, leaning on AsyncSeq.mapAsyncParallel
seems to achieve similar perf to both:
module Async =
let parallelThrottled dop f = Async.Parallel(f, maxDegreeOfParallelism = dop)
type Semaphore(max) =
let inner = new System.Threading.SemaphoreSlim(max)
member _.Await() = async {
let! ct = Async.CancellationToken
return! inner.WaitAsync ct |> Async.AwaitTask }
member _.Release() =
inner.Release() |> ignore
let throttle degreeOfParallelism f =
let s = Semaphore degreeOfParallelism
fun x -> async {
do! s.Await()
try return! f x
finally s.Release() }
module AsyncSeq =
open FSharp.Control
// see https://stackoverflow.com/a/71065152/11635
let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
let throttle = Async.throttle degreeOfParallelism
AsyncSeq.mapAsyncParallel (throttle f)
Testbed:
let dop = 10
let r = System.Random()
let durations = Array.init 10000 (fun _ -> r.Next(10, 100))
let work =
let sleep (x : int) = async {
do! Async.Sleep x
return x
}
AsyncSeq.ofSeq durations |> AsyncSeq.mapAsyncParallelThrottled dop sleep
let start = System.Diagnostics.Stopwatch.StartNew()
let results = work |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let timeTaken = start.ElapsedMilliseconds
let totalTimeTaken = Array.sum results
let expectedWallTime = float totalTimeTaken / float dop
let overhead = timeTaken - int64 expectedWallTime
let inline stringf format (x : ^a) =
(^a : (member ToString : string -> string) (x, format))
let inline sep x = stringf "N0" x
printfn $"Gross {sep totalTimeTaken}ms Threads {dop} Wall {sep timeTaken}ms overhead {sep overhead}ms ordered: {durations = results}"
Result:
Gross 544,873ms Threads 10 Wall 55,659ms overhead 1,172ms ordered: True
For now, it seems that for my use case there's no major win to be had by admitting unordered results vs just having the function argument to mapAsyncParallel
self-govern to achieve the desired throttling effect