Home > Back-end >  Custom Dataflow transform block with dynamic MaxDegreeOfParallelism
Custom Dataflow transform block with dynamic MaxDegreeOfParallelism

Time:12-12

I receive a sequence of objects (e.g., ItemGroup[]) that each contains multiple jobs (e.g., Item[]) and a max degree of parallelism value, for example:

public record Item(string Name);
public record ItemGroup(Item[] Items, int MaxDegreeOfParallelism);

The sequence of ItemGroup instances must be processed sequentially, but each ItemGroup may have a max degree of parallelism higher than 1. For example, the pipeline will process the group of A* items sequentially, then process the group of B* items concurrently:

var groups = new[]
{
    new ItemGroup(new[] { new Item("A0"), new Item("A1"), new Item("A2") }, 1),
    new ItemGroup(new[] { new Item("B0"), new Item("B1"), new Item("B2") }, 3)
};

I thought a custom TransformManyBlock implementation derived from IPropagatorBlock<ItemGroup, Item> would be a good choice, but I am not clear how to properly wait on the TransformManyBlock dynamically created internally as the producer posts ItemGroup instances to it.

Can anyone guide me here?

CodePudding user response:

You could create an inner TransformBlock<Item, Item> for each ItemGroup received. Below is a generalized solution with TInput, TChild and TOutput generic parameters. The TInput corresponds to a ItemGroup, the TChild corresponds to a Item, and the TOutput is also Item since you propagate the items without transforming them:

public static IPropagatorBlock<TInput, TOutput> CreateTransformManyDynamicBlock
    <TInput, TChild, TOutput>(
    Func<TInput, IEnumerable<TChild>> childrenSelector,
    Func<TInput, int> degreeOfParallelismSelector,
    Func<TChild, TOutput> transformChild)
{
    ArgumentNullException.ThrowIfNull(childrenSelector);
    ArgumentNullException.ThrowIfNull(degreeOfParallelismSelector);
    ArgumentNullException.ThrowIfNull(transformChild);

    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        TransformBlock<TChild, TOutput> innerBlock = new(transformChild, new()
        {
            MaxDegreeOfParallelism = degreeOfParallelismSelector(input)
        });
        foreach (var child in childrenSelector(input))
        {
            bool accepted = innerBlock.Post(child);
            if (!accepted) break; // The innerBlock has failed
        }
        innerBlock.Complete();

        // Propagate the results
        List<TOutput> results = new();
        while (await innerBlock.OutputAvailableAsync().ConfigureAwait(false))
            while (innerBlock.TryReceive(out TOutput result))
                results.Add(result);
        try { await innerBlock.Completion.ConfigureAwait(false); }
        catch when (innerBlock.Completion.IsCanceled) { throw; }
        catch { innerBlock.Completion.Wait(); } // Propagate AggregateException
        return results;
    });
}

Usage example:

IPropagatorBlock<ItemGroup, Item> block =
    CreateTransformManyDynamicBlock<ItemGroup, Item, Item>(
        x => x.Items, x => x.MaxDegreeOfParallelism, x => x);

Note: The above code has not been tested.


Update: My original implementation (revision 1) was based on the .NET 7 API ReceiveAllAsync, and the TransformManyBlock constructor that takes a Func<TInput,IAsyncEnumerable<TOutput>> argument (also .NET 7). The problem was that the ReceiveAllAsync doesn't propagate the exception of the enumerated dataflow block, so I switched to collecting and propagating manually the results, by filling a List<TOutput> as shown in this answer.

  • Related