Home > Back-end >  TPL Dataflow: How to start the next async action when the current one hasn't finished yet, pres
TPL Dataflow: How to start the next async action when the current one hasn't finished yet, pres

Time:09-08

Consider the following program, which uses TPL Dataflow. Hence, ActionBlock comes from the Dataflow library.

internal static class Program
{
    public static async Task Main(string[] args)
    {
        var actionBlock = new ActionBlock<int>(async i =>
        {
            Console.WriteLine($"Started with {i}");
            await DoSomethingAsync(i);
            Console.WriteLine($"Done with {i}");
        });

        for (int i = 0; i < 5; i  )
        {
            actionBlock.Post(i);
        }

        actionBlock.Complete();
        await actionBlock.Completion;
    }

    private static async Task DoSomethingAsync(int i)
    {
        await Task.Delay(1000);
    }
}

The output of this program is:

Started with 0
Done with 0
Started with 1
Done with 1
Started with 2
Done with 2
Started with 3
Done with 3
Started with 4
Done with 4

Reason is that the ActionBlock only starts processing the next task when the previous asynynchronous task was finished.

How can I force it to start processing the next task, even though the previous wasn't fully finished. MaxDegreeOfParallelism isn't an option, as that can mess up the order.

So I'd like the output to be:

Started with 0
Started with 1
Started with 2
Started with 3
Started with 4
Done with 0
Done with 1
Done with 2
Done with 3
Done with 4

I could get rid of the async/await and replace it with ContinueWith. But that has two disadvantages:

  1. The ActionBlock think it's done with the message immediately. An optional call to Complete() would result in the pipeline being completed directly, instead of after the asynchronous action to be completed.
  2. I'd like to add a BoundedCapacity to limit the amount of messages currently still waiting to be fully finished. But because of 1. this BoundedCapacity has no effect.

CodePudding user response:

In situations like this I would try to remove the requirement that things get processed in order, so that you can process in parallel, and then report sequentially.

//The transform block can process everything in parallel,
//but by default the inputs and outputs remain ordered
var processStuff = new TransformBlock<int, string>(async i =>
    {
        Console.WriteLine($"Started with {i}");
        await DoSomethingAsync(i);
        return $"Done with {i}";
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

//This action block is your reporting block that uses the results from
//the transform block, and it will be executed in order.
var useStuff = new ActionBlock<string>(result =>
    {
        Console.WriteLine(result);
    });

//when linking make sure to propagate completion.
processStuff.LinkTo(useStuff, new DataflowLinkOptions { PropagateCompletion = true });

for (int i = 0; i < 5; i  )
{
    Console.WriteLine("Posting {0}", i);
    processStuff.Post(i);
}

//mark the top of your pipeline as complete, and that will propagate
//to the end.
processStuff.Complete();
//wait on your last block to finish processing everything.
await useStuff.Completion;

output from this code produced the following as an example. Notice that the "started with" statements are not necessarily even in the order of the postings.

Posting 0
Posting 1
Posting 2
Posting 3
Posting 4
Started with 1
Started with 0
Started with 2
Started with 4
Started with 3
Done with 0
Done with 1
Done with 2
Done with 3
Done with 4

CodePudding user response:

I did, in the meantime, find a solution/workaround, by using two blocks, and passing the asynchronous Task from the first block to the next block, where it is waited for synchronously using .Wait().

So, like this:

using System.Reactive.Linq;
using System.Threading.Tasks.Dataflow;

internal static class Program
{
    public static async Task Main(string[] args)
    {
        var transformBlock = new TransformBlock<int, Task<int>>(async i =>
        {
            Console.WriteLine($"Started with {i}");
            await DoSomethingAsync(i);
            return i;
        });
        var actionBlock = new ActionBlock<Task<int>>(task =>
        {
            task.Wait();
            Console.WriteLine($"Done with {task.Result}");
        });

        transformBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (int i = 0; i < 5; i  )
        {
            transformBlock.Post(i);
        }

        transformBlock.Complete();
        await actionBlock.Completion;
    }

    private static Task DoSomethingAsync(int i)
    {
        return Task.Delay(1000);
    }
}}
}

This way the first block just considers itself done with a message almost instantly and is able to handle, in order, the next message which calls DoSomethingAsync directly, without waiting for the response of the previous call.

  • Related