Home > Blockchain >  How to run the continuation of a task in the taskBuilder CE in the same thread of the code before th
How to run the continuation of a task in the taskBuilder CE in the same thread of the code before th

Time:08-01

I am coding a long running operation in FSharp's taks CE as follows

let longRunningTask = Task.Run(...)


// Now let's do the rest of the multi-tasking program

task {
  DO SOMETHING
  let! result = longRunningTask
  DO SOMETHING ELSE
}

The problem is DO SOMETHING ELSE appears to be running on an arbitrary thread (as observed also by printing the current thread id), whereas I absolutely need it to run on the same thread as DO SOMETHING, as I don't want any other form of concurrency except for the longRunningTask.

I've tried in many ways to set the current synchronization context, creating first a unique value of that type, but that doesn't seem to affect the result.

CodePudding user response:

It might be an overkill, but SynchronizationContext may help you. It's used to dispatch delegates to some threads. There's a lot of explanations on how it's working (search for ConfigureAwait(false)), so I'll focus on implementation

type ThreadOwningSyncCtx() =
    inherit SynchronizationContext()

    let _queue = new BlockingCollection<(SendOrPostCallback * obj)>()

    member _.DoWork(cancellationToken: CancellationToken) =
        while not cancellationToken.IsCancellationRequested do
            let (callback, state) = _queue.Take()
            callback.Invoke state
        ()

    override _.Post(callback, state) =
        _queue.Add((callback, state))
        ()

    override _.Send(callback, state) =
        let tcs = TaskCompletionSource()
        let cb s =
            callback.Invoke s
            tcs.SetResult()
        _queue.Add((cb, state))
        tcs.Task.Wait()
        ()

Notes on methods:

  • Post: Method which is executed on async path. This method is called from infrastructure of Task when C# await or F# let! do! completes asynchronously. Callback is queued to be completed sometime.

  • Send: Method which is executed on sync path. It's expected that callback will be executed before this method returns. For example when someone calls a CancellationTokenSource.Cancel or WPF's Dispatcher.Invoke or WinForms Control.Invoke

  • DoWork: Method which blocks current thread to execute all pending callback, because we can't just interrupt thread to perform some task, it must be waiting for it.

Usage:

let syncCtx = ThreadOwningSyncCtx()
// set current sync ctx, so every continuation is queued back to main thread.
// comment this line and `printThreadId` will return different numbers
SynchronizationContext.SetSynchronizationContext syncCtx

let printThreadId() =
    printfn "%d" Thread.CurrentThread.ManagedThreadId

// create cancellation token, so app won't run indefinitely
let cts = new CancellationTokenSource()

// task to simulate some meaningful work
task {
    printThreadId()
    do! Task.Yield() // this action always completes asynchronously
    printThreadId()

    cts.Cancel() // cancel token, so main thread can continue it's work
} |> ignore

// process all pending continuations
syncCtx.DoWork(cts.Token)
  • Related