Home > other >  How do I update shared state with Parallel.ForEach
How do I update shared state with Parallel.ForEach

Time:11-04

I have a WPF app that reads an Outlook .pst file, extracts each message, and saves both it and any attachments as .pdf files. After that's all done, it does some other processing on the files.

I'm currently using a plain old foreach loop for the first part. Here is a rather simplified version of the code...

// These two are used by the WPF UI to display progress
string BusyContent;
ObservableCollection<string> Msgs = new();
// See note lower down about the quick-and-dirty logging
string _logFile = @"C:\Path\To\LogFile.log";
// _allFiles is used to keep a record of all the files we generate. Used after the loop ends
List<string> _allFiles = new();
// nCurr is used to update BusyContent, which is bound to the UI to show progress
int nCurr = 0;
// The messages would really be extracted from the .pst file. Empty list used for simplicity
List<Message> messages = new();

async Task ProcessMessages() {
  using StreamWriter logFile = new(_logFile, true);
  foreach (Message msg in messages) {
    nCurr  ;
    string fileName = GenerateFileName(msg);
    // We log a lot more, but only one shown for simplicity
    Log(logFile, $"File: {fileName}");
    _allFiles.Add(fileName);
    // Let the user know where we are up to
    BusyContent = $"Processing message {nCurr}";
    // Msgs is bound to a WPF grid, so we need to use Dispatcher to update
    Application.Current.Dispatcher.Invoke(() => Msgs.Add(fileName));
    // Finally we write out the .pdf files
    await ProcessMessage(msg);
  }
}

async Task ProcessMessage(Message msg) {
  // The methods called here are omitted as they aren't relevant to my questions
  await GenerateMessagePdf(msg);
  foreach(Attachment a in msg.Attachments) {
    string fileName = GenerateFileName(a);
    // Note that we update _allFiles here as well as in the main loop
    _allFiles.Add(fileName);
    await GenerateAttachmentPdf(a);
  }
}

static void Log(StreamWriter logFile, string msg) =>
  logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff")   " - "   msg);

This all works fine, but can take quite some time on a large .pst file. I'm wondering if converting this to use Parallel.ForEach would speed things up. I can see the basic usage of this method, but have a few questions, mainly concerned with the class-level variables that are used within the loop...

  1. The logFile variable is passed around. Will this cause issues? This isn't a major problem, as this logging was added as a quick-and-dirty debugging device, and really should be replaced with a proper logging framework, but I'd still like to know if what I'm dong would be an issue in the parallel version

  2. nCurr is updated inside the loop. Is this safe, or is there a better way to do this?

  3. _allFiles is also updated inside the main loop. I'm only adding entries, not reading or removing, but is this safe?

  4. Similarly, _allFiles is updated inside the ProcessMessage method. I guess the answer to this question depends on the previous one.

  5. Is there a problem updating BusyContent and calling Application.Current.Dispatcher.Invoke inside the loop?

Thanks for any help you can give.

CodePudding user response:

At first, it is necessary to use thread safe collections:

ObservableConcurrentCollection<string> Msgs = new();
ConcurrentQueue<string> _allFiles = new();

ObservableConcurrentCollection can be installed through NuGet. ConcurrentQueue is located in using System.Collections.Concurrent;. Special thanks to Theodor Zoulias for the pointing out that there is better option for ConcurentBag.

And then it is possible to use Parallel.ForEachor Task.

Parallel.ForEach uses Partitioner which allows to avoid creation more tasks than necessary. So it tries to run each method in parallel. So it is better to exclude async and await keywords of methods which participate in Parallel.ForEach.

    async Task  ProcessMessages()
    {
        using StreamWriter logFile = new(_logFile, true);

        await Task.Run(() => {
            Parallel.ForEach(messages, msg =>
            {
                var currentCount = Interlocked.Increment(ref nCurr);
                string fileName = GenerateFileName(msg);
                Log(logFile, $"File: {fileName}");
                _allFiles.Enqueue(fileName);
                BusyContent = $"Processing message {currentCount}";
                ProcessMessage(msg);
            });
        });
    }
    
    
    int ProcessMessage(Message msg)
    {
        // The methods called here are omitted as they aren't relevant to my questions
        var message = GenerateMessagePdf(msg);
        foreach (Attachment a in msg.Attachments)
        {
            string fileName = GenerateFileName(a);                
            _allFiles.Enqueue(fileName);
            GenerateAttachmentPdf(a);
        }
        return msg.Id;
    }


    private string GenerateAttachmentPdf(Attachment a) => string.Empty;


    private string GenerateMessagePdf(Message message) => string.Empty;


    string GenerateFileName(Attachment attachment) => string.Empty;


    string GenerateFileName(Message message) => string.Empty;


    void Log(StreamWriter logFile, string msg) =>
      logFile.WriteLine(DateTime.Now.ToString("yyMMdd-HHmmss.fff")   " - "   msg);
    

And another way is awaiting all tasks. In this case, there is no need to exclude async and await keywords.

    async Task ProcessMessages()
    {
        using StreamWriter logFile = new(_logFile, true);            
        var messageTasks = messages.Select(msg =>
        {                
            var currentCount = Interlocked.Increment(ref nCurr);
            string fileName = GenerateFileName(msg);                
            Log(logFile, $"File: {fileName}");
            _allFiles.Enqueue(fileName);                
            BusyContent = $"Processing message {currentCount}";
            return ProcessMessage(msg);
        });

        var msgs = await Task.WhenAll(messageTasks);
    }
  • Related