I am coding a long running operation in FSharp's taks CE as follows
let longRunningTask = Task.Run(...)
// Now let's do the rest of the multi-tasking program
task {
DO SOMETHING
let! result = longRunningTask
DO SOMETHING ELSE
}
The problem is DO SOMETHING ELSE appears to be running on an arbitrary thread (as observed also by printing the current thread id), whereas I absolutely need it to run on the same thread as DO SOMETHING, as I don't want any other form of concurrency except for the longRunningTask.
I've tried in many ways to set the current synchronization context, creating first a unique value of that type, but that doesn't seem to affect the result.
CodePudding user response:
It might be an overkill, but SynchronizationContext may help you. It's used to dispatch delegates to some threads. There's a lot of explanations on how it's working (search for ConfigureAwait(false)
), so I'll focus on implementation
type ThreadOwningSyncCtx() =
inherit SynchronizationContext()
let _queue = new BlockingCollection<(SendOrPostCallback * obj)>()
member _.DoWork(cancellationToken: CancellationToken) =
while not cancellationToken.IsCancellationRequested do
let (callback, state) = _queue.Take()
callback.Invoke state
()
override _.Post(callback, state) =
_queue.Add((callback, state))
()
override _.Send(callback, state) =
let tcs = TaskCompletionSource()
let cb s =
callback.Invoke s
tcs.SetResult()
_queue.Add((cb, state))
tcs.Task.Wait()
()
Notes on methods:
Post
: Method which is executed on async path. This method is called from infrastructure ofTask
when C#await
or F#let!
do!
completes asynchronously. Callback is queued to be completed sometime.Send
: Method which is executed on sync path. It's expected thatcallback
will be executed before this method returns. For example when someone calls aCancellationTokenSource.Cancel
or WPF'sDispatcher.Invoke
or WinFormsControl.Invoke
DoWork
: Method which blocks current thread to execute all pending callback, because we can't just interrupt thread to perform some task, it must be waiting for it.
Usage:
let syncCtx = ThreadOwningSyncCtx()
// set current sync ctx, so every continuation is queued back to main thread.
// comment this line and `printThreadId` will return different numbers
SynchronizationContext.SetSynchronizationContext syncCtx
let printThreadId() =
printfn "%d" Thread.CurrentThread.ManagedThreadId
// create cancellation token, so app won't run indefinitely
let cts = new CancellationTokenSource()
// task to simulate some meaningful work
task {
printThreadId()
do! Task.Yield() // this action always completes asynchronously
printThreadId()
cts.Cancel() // cancel token, so main thread can continue it's work
} |> ignore
// process all pending continuations
syncCtx.DoWork(cts.Token)