The below code used to work - but I am converting a lot of my async{} functions to task{} and I can't figure out how to make it run sequentially without a mutable variable.
let processBatchTransaction(page: IPage,
bnzBatch: BNZBatch,
cardTransactionswithOrder: BNZCardWithOrder list,
xeroClient: XeroClientFull) : Task<StatusMessage> = task {
let matchedTransactionsSeq
= seq{
for cardTransactionWithOrder in cardTransactionswithOrder do
let matchedTransaction = matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
yield matchedTransaction
}
let! matchedTransactions = matchedTransactionsSeq |> Async.Sequential
.... etc
}
The function "matchTransaction" used to be async - but there is no Task.Sequential, only Task.WhenAll but I need them to run synchronously one after the other.
CodePudding user response:
As promised (though Brian already gave a very good answer), here's mine. Let's first get a few rules out there:
- A Task created with
task { ... }
will always be hot-started. That means it runs immediately on the current thread (or background thread if you usebackgroundTask
). - A Task created through the TPL with the
Task<_>
classes will always be delay-started. Just like anasync
you will have to start these by hand. - Binding any task in a
task
CE will start that task. - The next binding expression will only start after the previous one is finished.
This means that, given an array or sequence with Task
s, most likely they're already running by the time you get them. And since they weren't created through bind
, they will run asynchronously. To prevent this, we need a few rules.
EDIT: check out F# if you haven't already, it can do the below things out of the box.
Rule 0: When dealing with tasks, always return a task
When combining tasks, binding tasks or joining them, always return the result as a task
. This will simplify your overall flow, besides, you cannot return a non-task (or non-async) without blocking the thread.
Rule 1: delay your tasks
Brian suggests to do this through lazy
. This is fine. You can also simply use unit-functions: fun() -> task { dosomething }
.
Rule 2: don't use .Result
or .Wait()
These will block your thread.
Rule 3: don't use things like List.traverseTaskResultA
These functions, part of the otherwise excellent FsToolkit.ErrorHandling.TaskResult
library, will not execute your tasks as run X -> wait for result -> run Y -> wait for result
. Instead, they do run X -> run Y -> run Z -> asTask
.
In other words, those library functions will lead to asynchronous, overlapping execution.
Rule 4: don't use Thread.Sleep
Hardly a rule, but while I was testing your scenario, I used Thread.Sleep
. The problem is, this function blocks the current thread. It will give the false impression that your functions are running in sequence in your test scenario, but then in your real-world scenario they don't anymore.
Use Task.Delay
instead. It works the same, but won't block the thread.
Solution
There are multiple ways to do this. I will just present you a single one using ContinueWith
. Steps:
- Your tasks must be returned delayed, as unit functions
- Your tasks don't rely on each other's result (this can easily be changed, though)
- Wrap your tasks in a continuation, and then
Unwrap
that continuation again.
This last one seems a little odd, but the TPL doesn't come with a standard bind
. And the bind
from F#, though applicable here, is a little hard to use. The Ply
lib comes with an easier bind, you may try that out as well.
Anyway, here's the hardcore way, using only standard library functions. The good news is, you only have to write this function once.
/// Join multiple delayed tasks and return the result of the last
let join tasks =
let wrapNext (t: unit -> Task<_>) (source: unit -> Task<_>): unit -> Task<_> =
fun () ->
source()
// this is the CORE of the whole operation
.ContinueWith((fun (_: Task) -> t ()), TaskContinuationOptions.OnlyOnRanToCompletion)
// extra step needed, as BCL has no direct way to unwrap nested tasks
.Unwrap() :?> Task<_>
let rec combine acc (tasks: (unit -> Task<_>) list) =
match tasks with
| [] -> acc
| t :: tail -> combine (wrapNext t acc) tail
match tasks with
| first :: rest -> combine first rest
| [] -> failwith "oh oh, no tasks given!"
Here's how you can use it. The first function looks a little hacky, but it's just to mimic your scenario and to be able to double-check that it runs sequentially.
/// Create 10 tasks, use stream for writing, otherwise would garble FSI
/// Note that those can be task or backgroundTask
let createBunchOfTasks(sw: StreamWriter) =
let mutable x = 0
let rnd () = Random().Next(10, 30)
let o = obj ()
let runTask i = backgroundTask {
let! _ = Task.Delay(rnd ()) // use randomization to ensure tasks last different times
x <- x 1
// just some logging to a file, stdout is not good in this case
lock o (fun () -> sw.WriteLine(sprintf "Task #%i after delay: %i" i x))
return x
}
[
// creating bunch of dummy tasks
for i in 0..10 do
fun () -> backgroundTask { return! runTask i }
]
In your scenario, you won't need the above code, but you will need something like this to call the join
function. If you need logging to a file, you can use this pattern, but it was really only here for my sanity check, and is rather crude ;).
let runMultipleTasks() =
// should give this as argument, as must be closed after all tasks completed
let file = File.Open("output1.txt", FileMode.Create)
let stream = new StreamWriter(file)
// the actual creation of tasks
let tasks = createBunchOfTasks stream
let combinedTask = join tasks
// start the combined tasks
combinedTask()
CodePudding user response:
One thing you can do is use laziness to prevent the tasks from starting too soon. For example:
let createTask n =
task {
printfn $"Task {n} started"
Thread.Sleep(2000)
printfn $"Task {n} finished"
return 100 n
}
let lazyTasks =
seq {
for n = 1 to 10 do
lazy createTask n
}
So in your case, you would write:
for cardTransactionWithOrder in cardTransactionswithOrder do
let lazyMatchedTransaction = lazy matchTransaction(page,bnzBatch,cardTransactionWithOrder, xeroClient)
yield lazyMatchedTransaction
You can then evaluate any sequence of lazy tasks sequentially like this:
let evalTasksSequential lazyTasks =
task {
return seq {
for (lazyTask : Lazy<Task<_>>) in lazyTasks do
yield lazyTask.Value.Result // start task and wait for it to finish
}
}
Test example:
task {
let! results = evalTasksSequential lazyTasks
for result in results do
printfn $"{result}"
} |> ignore
Output:
Task 1 started
Task 1 finished
101
Task 2 started
Task 2 finished
102
Task 3 started
Task 3 finished
103
Task 4 started
Task 4 finished
104
Task 5 started
Task 5 finished
105
Task 6 started
Task 6 finished
106
Task 7 started
Task 7 finished
107
Task 8 started
Task 8 finished
108
Task 9 started
Task 9 finished
109
Task 10 started
Task 10 finished
110