Home > other >  Unordered F# AsyncSeq.mapParallel with throttling
Unordered F# AsyncSeq.mapParallel with throttling

Time:02-11

I'm using F# and have an AsyncSeq<Async<'t>>. Each item will take a varying amount of time to process and does I/O that's rate-limited.

I want to run all the operations in parallel and then pass them down the chain as an AsyncSeq<'t> so I can perform further manipulations on them and ultimately AsyncSeq.fold them into a final outcome.

The following AsyncSeq operations almost meet my needs:

  • mapAsyncParallel - does the parallelism, but it's unconstrained, (and I don't need the order preserved)
  • iterAsyncParallelThrottled - parallel and has a max degree of parallelism but doesn't let me return results (and I don't need the order preserved)

What I really need is like a mapAsyncParallelThrottled. But, to be more precise, really the operation would be entitled mapAsyncParallelThrottledUnordered.

Things I'm considering:

  1. use mapAsyncParallel but use a Semaphore within the function to constrain the parallelism myself, which is probably not going to be optimal in terms of concurrency, and due to buffering the results to reorder them.
  2. use iterAsyncParallelThrottled and do some ugly folding of the results into an accumulator as they arrive guarded by a lock kinda like this - but I don't need the ordering so it won't be optimal.
  3. build what I need by enumerating the source and emitting results via AsyncSeqSrc like this. I'd probably have a set of Async.StartAsTask tasks in flight and start more after each Task.WaitAny gives me something to AsyncSeqSrc.put until I reach the maxDegreeOfParallelism

Surely I'm missing a simple answer and there's a better way?

Failing that, would love someone to sanity check my option 3 in either direction!

I'm open to using AsyncSeq.toAsyncEnum and then use an IAsyncEnumerable way of achieving the same outcome if that exists, though ideally without getting into TPL DataFlow or RX land if it can be avoided (I've done extensive SO searching for that without results...).

CodePudding user response:

If I'm understanding your requirements then something like this will work. It effectively combines the iter unordered with a channel to allow a mapping instead.

let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<_>) source = asyncSeq {
    let! ct = Async.CancellationToken
    let channel = Channel.CreateUnbounded()

    let! _ = 
        async {
            do!
                source
                |> AsyncSeq.iterAsyncParallelThrottled boundedAmount (fun s -> async {
                    let! orderChild = mapper s
                    do! channel.Writer.WriteAsync(orderChild, ct)
                })

            channel.Writer.Complete()
        } 
        |> Async.StartChild

    for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
        let! toReturn = item
        yield toReturn
}

Also with a little bit of variation of the above (e.g. child tasks) you can make it ordered and parallelism bounded.

let mapAsyncParallelBounded boundedAmount mapper source = asyncSeq {
    let! ct = Async.CancellationToken
    let channel = Channel.CreateBounded(BoundedChannelOptions(boundedAmount))

    let! _ =
        source
        |> AsyncSeq.iterAsync (fun s -> async {
            let! orderChild = mapper s |> Async.StartChild
            do! channel.Writer.WriteAsync(orderChild, ct)
        })
        |> Async.StartChild

    let! ct = Async.CancellationToken
    for item in channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum do
        let! toReturn = item
        yield toReturn
}

CodePudding user response:

Here's a testbed I used to validate @akara's excellent work:

#r "nuget:FSharp.Control.AsyncSeq"
open FSharp.Control
module AsyncSeqEx =

    open System.Threading.Channels

    let mapAsyncParallelBoundedUnordered boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
        let! ct = Async.CancellationToken
        let channel : Channel<'u> = Channel.CreateUnbounded()
        let handle req = async {
            let! res = mapper req
            do! let t = channel.Writer.WriteAsync(res, ct) in t.AsTask() |> Async.AwaitTask }
        let! _ = Async.StartChild <| async {
            do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
            channel.Writer.Complete() }
        yield! channel.Reader.ReadAllAsync(ct) |> AsyncSeq.ofAsyncEnum
    }

I also ported the same code to use AsyncSeqSrc instead of channels, which seems to work too, with equivalent perf:

    // AsyncSeqSrc-based reimpl of the above
    let mapAsyncParallelBoundedUnordered2 boundedAmount (mapper: 't -> Async<'u>) source = asyncSeq {
        let output = AsyncSeqSrc.create ()
        let handle req = async { let! res = mapper req in AsyncSeqSrc.put res output }
        let! _ = Async.StartChild <| async {
            do! source |> AsyncSeq.iterAsyncParallelThrottled boundedAmount handle
            AsyncSeqSrc.close output }
        yield! AsyncSeqSrc.toAsyncSeq output
    }

The following impl, leaning on AsyncSeq.mapAsyncParallel seems to achieve similar perf to both:

module Async =

    let parallelThrottled dop f = Async.Parallel(f, maxDegreeOfParallelism = dop)
    type Semaphore(max) =
        let inner = new System.Threading.SemaphoreSlim(max)
        member _.Await() = async {
            let! ct = Async.CancellationToken
            return! inner.WaitAsync ct |> Async.AwaitTask }
        member _.Release() =
            inner.Release() |> ignore
    let throttle degreeOfParallelism f =
        let s = Semaphore degreeOfParallelism
        fun x -> async {
            do! s.Await()
            try return! f x
            finally s.Release() }

module AsyncSeq =

    open FSharp.Control

    // see https://stackoverflow.com/a/71065152/11635
    let mapAsyncParallelThrottled degreeOfParallelism (f: 't -> Async<'u>) : AsyncSeq<'t> -> AsyncSeq<'u> =
        let throttle = Async.throttle degreeOfParallelism
        AsyncSeq.mapAsyncParallel (throttle f)

Testbed:

let dop = 10
let r = System.Random()
let durations = Array.init 10000 (fun _ -> r.Next(10, 100))
let work =
    let sleep (x : int) = async {
        do! Async.Sleep x
        return x
    }
    AsyncSeq.ofSeq durations |> AsyncSeq.mapAsyncParallelThrottled dop sleep
let start = System.Diagnostics.Stopwatch.StartNew()
let results = work |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
let timeTaken = start.ElapsedMilliseconds
let totalTimeTaken = Array.sum results
let expectedWallTime = float totalTimeTaken / float dop
let overhead = timeTaken - int64 expectedWallTime
let inline stringf format (x : ^a) =
    (^a : (member ToString : string -> string) (x, format))
let inline sep x = stringf "N0" x
printfn $"Gross {sep totalTimeTaken}ms Threads {dop} Wall {sep timeTaken}ms overhead {sep overhead}ms ordered: {durations = results}"

Result:

Gross 544,873ms Threads 10 Wall 55,659ms overhead 1,172ms ordered: True

For now, it seems that for my use case there's no major win to be had by admitting unordered results vs just having the function argument to mapAsyncParallel self-govern to achieve the desired throttling effect

  • Related