Home > Enterprise >  Avoiding collisions when processing a list in parallel
Avoiding collisions when processing a list in parallel

Time:07-08

I'm not sure the title is the best, but I think the code below will help. For simplicity sake, let's say I have a list of a class with two values, category and itemid. I need to process each item in the list, but I can't process two records with the same category at the same time (it ends up updating the same records in the database). The existing code is grouping the items by row number (partitioned by category). That way I can run the items for each group in parallel because there will never be duplicate categories.

I feel like there is a better way to do this, but I'm not exactly sure what it is. Right now the order of the processing might look like this:

Row Number 1: a, d, c, b, e

Row Number 2: a, b, c

Row Number 3: b, c

Row Number 4: c

It works, but what if category "e" takes a while. It would be processing a single item, when, in theory, it could be processing additional records.

Any thoughts? Or how I can better search (because so far I've been coming up empty)?

In the code below I commented out the ".Dump()" calls, but if you run the code in LINQPad, those might be useful.

void Main()
{
    var list = new List<Item> {
        new Item("a", "1"),
        new Item("a", "2"),
        new Item("b", "3"),
        new Item("b", "4"),
        new Item("b", "5"),
        new Item("c", "6"),
        new Item("c", "7"),
        new Item("c", "8"),
        new Item("c", "9"),
        new Item("d", "10"),
        new Item("e", "11"), 
    };

    var itemsToProcess = list
        .OrderBy(l => l.Category)
        .GroupBy(i => i.Category)
        .Select( group => new { Group = group, Count = group.Count() } )
        .SelectMany( groupWithCount =>
            groupWithCount.Group.Select(b => b)
            .Zip(
                Enumerable.Range(1, groupWithCount.Count),
                (j, i) => new { j.Category, j.ItemId, RowNumber = i }
            )
        );
        
    // itemsToProcess.Dump();
    var itemsGroupedByRow = itemsToProcess.GroupBy(i => i.RowNumber);
    foreach (var items in itemsGroupedByRow)
    {
        // items.Key.Dump("Row Number");
        Parallel.ForEach(items,
            (item) =>
            {
                // do stuff
                // $"{item.Category} - {item.ItemId}".Dump();
            }
        );
    }
}

public class Item
{
    public Item(string category, string itemId)
    {
        Category = category;
        ItemId = itemId;
    }
    public string Category { get; set; }
    public string ItemId { get; set; }
}

CodePudding user response:

You should think about this by category individually instead of trying to assemble processing units yourself (your "rows"). Basically, process each category in parallel, but each item at a time for each category ordered by item id.

Something like this (the Sleep is to illustrate the long working for item e does not affect the rest of the processing):

    public static void Run()
    {
        var list = new List<Item> {
            new ("a", "1"),
            new ("a", "2"),
            new ("b", "3"),
            new ("b", "4"),
            new ("b", "5"),
            new ("c", "6"),
            new ("c", "7"),
            new ("c", "8"),
            new ("c", "9"),
            new ("d", "10"),
            new ("e", "11")
        };

        var categories = list.Select(i => i.Category).Distinct();

        Parallel.ForEach(categories,
            (category) => ProcessCategory(list
                .OrderBy(i => i.ItemId)
                .Where(i => i.Category.Equals(category))
                .ToList()));


        //var itemsToProcess = list
        //    .OrderBy(l => l.Category)
        //    .GroupBy(i => i.Category)
        //    .Select(group => new { Group = group, Count = group.Count() })
        //    .SelectMany(groupWithCount =>
        //        groupWithCount.Group.Select(b => b)
        //            .Zip(
        //                Enumerable.Range(1, groupWithCount.Count),
        //                (j, i) => new { j.Category, j.ItemId, RowNumber = i }
        //            )
        //    );

        //// itemsToProcess.Dump();
        //var itemsGroupedByRow = itemsToProcess.GroupBy(i => i.RowNumber);
        //foreach (var items in itemsGroupedByRow)
        //{
        //    // items.Key.Dump("Row Number");
        //    Parallel.ForEach(items,
        //        (item) =>
        //        {
        //            // do stuff
        //            if (item.Category == "e")
        //                Thread.Sleep(1000); 
        //            Console.WriteLine($"{item.Category} - {item.ItemId}");
        //        }
        //    );
        //}

        Console.WriteLine("DONE!");
    }

    private static void ProcessCategory(List<Item> items)
    {
        foreach (var item in items)
        {
            if (item.Category == "e")
                Thread.Sleep(1000);
            Console.WriteLine($"{item.Category} - {item.ItemId}");
        }
    }
  •  Tags:  
  • c#
  • Related