Home > Mobile >  .NET 4.8 Why is async not allowing for parallel processing unless I manually create new threads?
.NET 4.8 Why is async not allowing for parallel processing unless I manually create new threads?

Time:06-23

I wrote a Windows service, and I would like it to work using the same exact logic that it currently has, but process everything in parallel.

The real codebase is fairly abstracted and private, so I can't post the source here but heres the jist of it.

The app is a persistent process scheduler. It leverages EntityFramework 6 to scan a database for records detailing (among other things): 1.) a path for a process to run, 2.) a date/time to run the process and 3. the scheduled frequency that it is on.

Basic Functionality

  1. It loops through the database for active records and returns all scheduled job details

  2. Checks the date and time against the current date and time, within a buffer

  3. If the job should run, it has been using either new Process().Start(...) with the path from the record, initializes the process if the file is found and is executable, and then wait for an exit or the configured timeout threshold to elapse

  4. The exit code or lack of one (in the event of hanging processes) for each process run is what single-handedly determines if the record remains active and continues to get cycled and re-scheduled dynamically into the future, or instead, deactivated with errors logged to the associated record in the DB.

  5. The process continues perpetually unless explicitly stopped.

Currently Working in Parallel (1000% faster, but it looks like it is possibly skipping records!). Maybe I need to add a lock before accessing the db?

As it turns out I was using (var process) {...} and it was throwing that it was being disposed. After staring at the code a few days, I saw this stupid mistake I had made trying to be tidy ;p

var tasks = new List<Thread>;

schedules.ForEach(schedule => {
  // I have also tried ThreadPool.QueueUserWorkerItem(...) but then I read its basically long-hand for Task.Run() and I don't think it was working the same as new Thread using this pattern.

  var thread = new Thread(() => await ProcessSchedule(schedule));

  // Actually using slim semaphore in wild, but for simplicty sake...
  thread.Start();
  threads.Add(thread);
});

// Before exiting...
while (!threads.All(instance => !instance.IsAlive))
{
  await Delay(debounceValue);

  continue;
}

Working in Sequence without Issue Besides it's Blocking Slowness...

var tasks = new List<Task>;

schedules.ForEach(schedule => {
  // I have also tried to just await this here, but that obviously will block anything on the same thread, so I add the tasks to a list a wait for completion after the loop is complete and the processes are working on their own process. 
  tasks.Add(ProcessSchedule(schedule));
});

// Before exiting... 
// I expected this to work but it still seems to go record by record :*(
// Also tried using Task.Run(() => await task(...)) with no luck...
await Task.WhenAll(tasks);

Note: I am passing the list of tasks or threads up another level in the real code so it can process and be awaited on while everything is working but this is some simplified borderline-psuedo code strictly for demonstrating the concept I am struggling with as concise as possible

Inside of ProcessSchedule

Async method which starts a new process and waits for an exit. When one is received, a success or exit is written to the database using EntityFramework 6 on the schedule record which drove the process for this instance being parsed and evaluated. EG:

 new Process(startInfo).Start();

 // Monitor process, persist exit state via 

dbContext.SaveChangesAsync(); 
process.StandardError  = handleProcExitListener;
process.StandardOutput  = handleProcExitListener;
process.Exited  = (...) => handleProcExitListener(...); 

I can say that:

I have no non-awaited async methods unless its using await Task.Run(MethodAsync), is in Main(argz) await Task.WhenAll(task);, etc.

Is async-await blocking me because DbContext is not thread safe by default or something? If this is the case, would someone please verify how I can achieve what I am looking for?

I have tried a number of techniques, but I am not able to get the application to run each process simultaneously, then wait and react upon the end state after spawning the processes, unless I use multithreading (new Thread directly maybe also using ThreadPool).

I have not had to resort to using threads in a while, mostly since the introduction of async-await in C#. Therefore, I am questioning myself using it without first fully understanding why. I would really appreciate some help grasping what I am missing.

It seems to me async is just a fancy pattern and facades for easy access to state-machine characteristics. Why then when I researched the subject did I just read that using ThreadPool.QueueUserWorkerItem(...) is rather obsolete since TPL async/await. If async/await does not give you new threads to work with, is running a process in parallel possible without it? Also, these processes take anywhere from 10min to 45min each to run. So you can see the importance of running them all together.

Since I am stuck with .NET 4.8, I, unfortunately, cannot use the async version of WaitForExitAsync() introduced in v5 .

Solution I modeled a solution from the following Async process start and wait for it to finish

public static Task<bool> WaitForExitAsync(this Process process, TimeSpan timeout)
    {
        ManualResetEvent processWaitObject = new ManualResetEvent(false);
        processWaitObject.SafeWaitHandle = new SafeWaitHandle(process.Handle, false);

        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

    RegisteredWaitHandle registeredProcessWaitHandle = null;
    registeredProcessWaitHandle = ThreadPool.RegisterWaitForSingleObject(
        processWaitObject,
        delegate(object state, bool timedOut)
        {
            if (!timedOut)
            {
                registeredProcessWaitHandle.Unregister(null);
            }

            processWaitObject.Dispose();
            tcs.SetResult(!timedOut);
        },
        null /* state */,
        timeout,
        true /* executeOnlyOnce */);

    return tcs.Task;
}

CodePudding user response:

Even though you have omitted some of the Process code, I'm assuming that you are calling the blocking method Process.WaitForExit instead of the async equivalent of it. I have created a mock type solution and this runs in parallel.


private static async Task RunPowershellProcess()
{
    using var process = new Process();
    process.StartInfo.FileName = @"C:\windows\system32\windowspowershell\v1.0\powershell.exe";
    process.StartInfo.UseShellExecute = true;
    process.Exited  = (a, _) =>
    {
        var p = a as Process;
        Console.WriteLine(p?.ExitCode);
    };
    process.EnableRaisingEvents = true;
    process.Start();
    await process.WaitForExitAsync();
}

static async Task Main(string[] args)
{
    var tasks = new List<Task>(10);
    for (var x = 0; x < 10; x  )
    {
        tasks.Add(RunPowershellProcess());
    }

    await Task.WhenAll(tasks);
}



CodePudding user response:

Solution

I modeled a solution from the following async process start extension method in place of the relic .NET method.

Async process start and wait for it to finish

I also found some exceptions being buried from using the process and it being disposed of when exit events fired.

public static Task<bool> WaitForExitAsync(this Process process, TimeSpan timeout)
    {
        ManualResetEvent processWaitObject = new ManualResetEvent(false);
        processWaitObject.SafeWaitHandle = new SafeWaitHandle(process.Handle, false);

    TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();

RegisteredWaitHandle registeredProcessWaitHandle = null;
registeredProcessWaitHandle = ThreadPool.RegisterWaitForSingleObject(
    processWaitObject,
    delegate(object state, bool timedOut)
    {
        if (!timedOut)
        {
            registeredProcessWaitHandle.Unregister(null);
        }

        processWaitObject.Dispose();
        tcs.SetResult(!timedOut);
    },
    null /* state */,
    timeout,
    true /* executeOnlyOnce */);

  return tcs.Task;
}
  • Related