Home > database >  Microsoft TPL Dataflow Library | MaxDegreeOfParallelism and IO-bound work
Microsoft TPL Dataflow Library | MaxDegreeOfParallelism and IO-bound work

Time:12-17

My use case is this: send 100,000 web requests to our application server and wait the results. Here, most of the delay is IO-bound, not CPU-bound, so I understand the Dataflow libraries may not be the best tool for this. I've managed to use it will a lot of success and have set the MaxDegreeOfParallelism to the number of requests that I trust the server to be able to handle, however, since this is the maximum number of tasks, it's no guarantee that this will actually be the number of tasks running at any time.

The only bit of information I could find in the documentation is this:

Because the MaxDegreeOfParallelism property represents the maximum degree of parallelism, the dataflow block might execute with a lesser degree of parallelism than you specify. The dataflow block can use a lesser degree of parallelism to meet its functional requirements or to account for a lack of available system resources. A dataflow block never chooses a greater degree of parallelism than you specify.

This explanation is quite vague on how it actually determines when to spin up a new task. My hope was that it will recognize that the task is blocked due to IO, not any system resources, and it will basically stay at the maximum degrees of parallelism for the entire duration of the operation.

However, after monitoring a network capture, it seems to be MUCH quicker in the beginning and slower near the end. I can see from the capture, that at the beginning it does reach the maximum as specified. The TPL library doesn't have any built-in way to monitor the current number of active threads, so I'm not really sure of the best way to investigate further on that end.

My implementation:

   internal static ExecutionDataflowBlockOptions GetDefaultBlockOptions(int maxDegreeOfParallelism,
        CancellationToken token) => new()
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism,
            CancellationToken = token,
            SingleProducerConstrained = true,
            EnsureOrdered = false
        };


    private static async ValueTask<T?> ReceiveAsync<T>(this ISourceBlock<T?> block, bool configureAwait, CancellationToken token)
    {
        try
        {
            return await block.ReceiveAsync(token).ConfigureAwait(configureAwait);
        } catch (InvalidOperationException)
        {
            return default;
        }
    }

    internal static async IAsyncEnumerable<T> YieldResults<T>(this ISourceBlock<T?> block, bool configureAwait,
        [EnumeratorCancellation]CancellationToken token)
    {
        while (await block.OutputAvailableAsync(token).ConfigureAwait(configureAwait))
            if (await block.ReceiveAsync(configureAwait, token).ConfigureAwait(configureAwait) is T result)
                yield return result;

        // by the time OutputAvailableAsync returns false, the block is gauranteed to be complete. However,
        // we want to await it anyway, since this will propogate any exception thrown to the consumer.
        // we don't simply await the completed task, because that wouldn't return all aggregate exceptions,
        // just the last to occur
        if (block.Completion.Exception != null)
            throw block.Completion.Exception;
    }

    public static IAsyncEnumerable<TResult> ParallelSelectAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<TResult?>> body,
        int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler? scheduler = null, CancellationToken token = default)
    {
        var options = GetDefaultBlockOptions(maxDegreeOfParallelism, token);

        if (scheduler != null)
            options.TaskScheduler = scheduler;

        var block = new TransformBlock<T, TResult?>(body, options);

        foreach (var item in source)
            block.Post(item);

        block.Complete();

        return block.YieldResults(scheduler != null && scheduler != TaskScheduler.Default, token);
    }

So, basically, my question is this: when an IO-bound action is executed in a TPL Dataflow block, how can I ensure the block stays at the MaxDegreeOfParallelism that is set?

CodePudding user response:

On the contrary, Dataflow is great at IO work and perfect for this scenario. DataFlow architectures work by creating pipelines similar to Bash or PowerShell pipelines. Each block acting as a separate command, reading messages from its input queue and passing them to the next block through its output queue. That's why the default DOP is 1 - parallelism and concurrency come from using multiple commands/blocks, not a fat block with a high DOP

This is a simplified example of what I use at work request daily sales reports from about a hundred airlines (BSPs for those that know about air tickets), parse the reports and then download individual ticket records, before importing everything into the database.

In this case the head block downloads content with a DOP=10, then the parser block parses the responses one at a time. The downloader is IO-bound so it can make a lot more requests than there are cores, as many as the services allow, or the application wants to handle.

The parser on the other hand is CPU bound. A high DOP would lock a lot of core which would harm not just the application, but other processes as well.

// Create the blocks
var dlOptions = new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism=10
};
var downloader=new TransformBlock<string,string>(
                     url => _client.GetStringAsync(url,cancellationToken),
                     dlOptions);
var parser=new TransformBlock<string,Something>(ParseIntoSomething);
var importer=new ActionBlock<Something>(ImportInDb);

// Link the blocks
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
downloader.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);

After building this 3 step pipeline I post URLs at the front and expect the tail block to complete

foreach(var url in urls)
{
    downloader.Post(url);
}

downloader.Complete();
await importer.Completion;

There are a lot of improvements to this. Right now, if the downloader is faster than the parser, all the content will be buffered in memory. In a long running pipeline this can easily take up all available memory.

A simple way to avoid this is add BoundedCapacity=N to the parser block options. If the parser input buffer is full, upstream blocks, in this case the downloader, will pause and wait until a slot becomes available :

var parserOptions = new ExecutionDataflowBlockOptions {
    BoundedCapacity=2,
    MaxDegreeOfParallelism=2,
};
var parser=new TransformBlock<string,Something>(ParseIntoSomething, parserOptions);
  • Related