Update (Explanation of output)
I have multiple tasks running that all try to do a job however I only ever want one task to be doing or waiting to do that job.
In one scenario, if the job is in-progress, other tasks that attempt to do that same job will see that it's already in-progress and skip attempting to do it and move on immediately because they have no need to know if or when it actually finishes.
In another scenario, if the job is in-progress, other tasks should wait for the job to finish before continuing but then not attempt to do the job themselves because it was just completed by the task they were waiting on. Once the job is complete, all the tasks that were waiting may move on but none of them should attempt to do that job that was just done.
Original Question
My goal is to create an async Task
extension method that will generate results similar to this:
(not exactly because SKIP
and WAIT-TO-SKIP
won't be in exactly the same order every time)
When run with one thread:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 1] WORK-END
[Task 1] UNLOCKED
When run with five threads:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] WAIT-TO-SKIP
[Task 3] WAIT-TO-SKIP
[Task 4] WAIT-TO-SKIP
[Task 5] WAIT-TO-SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP
It should also have a parameter that allows it to behave like this:
[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED
Here's the method I came up with:
public static async Task FirstThreadAsync(IFirstThread obj, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
{
if (obj.Locked)
{
if (waitTaskSource != null && !waitTaskSource.Task.IsCompleted)
{
Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP");
await waitTaskSource.Task;
}
Log.Debug(Logger, $"[{threadName}] SKIP-1");
return;
}
var lockWasTaken = false;
var temp = obj;
try
{
if (waitTaskSource == null || waitTaskSource.Task.IsCompleted == false)
{
Monitor.TryEnter(temp, ref lockWasTaken);
if (lockWasTaken) obj.Locked = true;
}
}
finally
{
if (lockWasTaken) Monitor.Exit(temp);
}
if (waitTaskSource?.Task.IsCompleted == true)
{
Log.Debug(Logger, $"[{threadName}] SKIP-3");
return;
}
if (waitTaskSource != null && !lockWasTaken)
{
if (!waitTaskSource.Task.IsCompleted)
{
Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP (LOCKED)");
await waitTaskSource.Task;
}
Log.Debug(Logger, $"[{threadName}] SKIP-2");
return;
}
Log.Debug(Logger, $"[{threadName}] LOCKED");
try
{
Log.Debug(Logger, $"[{threadName}] WORK-START");
await action.Invoke().ConfigureAwait(false);
Log.Debug(Logger, $"[{threadName}] WORK-END");
}
catch (Exception ex)
{
waitTaskSource?.TrySetException(ex);
throw;
}
finally
{
obj.Locked = false;
Log.Debug(Logger, $"[{threadName}] UNLOCKED");
waitTaskSource?.TrySetResult();
}
}
Here's the interface and the Example
class:
public interface IFirstThread
{
bool Locked { get; set; }
}
public class Example : IFirstThread
{
public bool Locked { get; set; }
public async Task DoWorkAsync(string taskName)
{
for (var i = 0; i < 10; i )
{
await Task.Delay(5);
}
}
}
Here's a unit test where the Task1 - Task5
are all trying to run at the same time and Task End
runs after they all completed:
[TestMethod]
public async Task DoWorkOnce_AsyncX()
{
var waitTaskSource = new TaskCompletionSource();
var example = new Methods.Example();
var tasks = new List<Task>
{
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 1"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 2"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 3"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 4"),
FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 5"),
};
await Task.WhenAll(tasks);
await FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task End"),
//code to get and compare the output
}
The problem I'm having is that 99% of the time it works as expected but sometimes it allows two threads to run simultaneously, and I can't seem to figure out why or how to stop it. Here's an example of the unit test output from one the rare cases when it allows two threads to run simultaneously:
[Task 5] LOCKED,
[Task 4] WAIT-TO-SKIP,
[Task 2] LOCKED,
[Task 3] WAIT-TO-SKIP,
[Task 1] WAIT-TO-SKIP,
[Task 5] WORK-START,
[Task 2] WORK-START,
[Task 2] WORK-END,
[Task 5] WORK-END,
[Task 5] UNLOCKED,
[Task 2] UNLOCKED,
[Task 1] SKIP-1,
[Task 4] SKIP-1,
[Task 3] SKIP-1,
[Task End] LOCKED,
[Task End] WORK-START,
[Task End] WORK-END,
[Task End] UNLOCKED
As you can see Task 5
and Task 2
both get locked when only one of them should be getting locked. Task End
is only part of the test to verify that class is left in an unlocked state when the simultaneous calls are done. Also, the threadName
parameter is completely not necessary and only included so I can tell which task is which for testing purposes.
CodePudding user response:
You use Monitor.Exit
much too early. When in one thread
Monitor.TryEnter(temp, ref lockWasTaken);
is executed after
if (lockWasTaken) Monitor.Exit(temp);
was executed in another thread, both can have lockWasTaken
true.
You don't need the Locked
property on the object you use as the protected resource. The Monitor
class sets an internal marker in the object directly in an atomic matter. You can simplify your logic a lot by only relying on the Monitor
functionality without having your own separate flags.
Also, as noted by another user, we don't see you creating explicit separate threads for the tasks. The tasks can run on the same thread and then Monitor.TryEnter
will allow multiple tasks to enter at the same time when they run on the same thread.
See https://softwareengineering.stackexchange.com/questions/340414/when-is-it-safe-to-use-monitor-lock-with-task on why you should use SempahoreSlim
instead of Monitor
in async programming.
Here is some sample code solving the task with SemaphoreSlim
. I don't see the deeper purpose though ;)
private async void button1_Click(object sender, EventArgs e)
{
TaskCompletionSource? waitTaskSource = null; // or new TaskCompletionSource();
var example = new Example();
var sempahore = new SemaphoreSlim(1);
var tasks = new List<Task>
{
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 1"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 2"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 3"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 4"),
FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 5"),
};
await Task.WhenAll(tasks);
await FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task End");
}
public static async Task FirstThreadAsync(SemaphoreSlim semaphore, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
{
// Try to acquire lock
bool lockAcquired = await semaphore.WaitAsync(TimeSpan.Zero);
if (lockAcquired)
{
// Lock acquired -> Do Work
Debug.WriteLine($"[{threadName}] LOCKED");
try
{
Debug.WriteLine($"[{threadName}] WORK-START");
await action.Invoke();
Debug.WriteLine($"[{threadName}] WORK-END");
semaphore.Release();
Debug.WriteLine($"[{threadName}] UNLOCKED");
}
catch (Exception ex)
{
waitTaskSource?.TrySetException(ex);
}
finally
{
waitTaskSource?.TrySetResult();
}
}
else // No lock acquired
{
// When source is specified, await it.
if(waitTaskSource != null)
{
Debug.WriteLine($"[{threadName}] WAIT-TO-SKIP");
await waitTaskSource.Task;
}
Debug.WriteLine($"[{threadName}] SKIP");
}
}
public class Example
{
public async Task DoWorkAsync()
{
for (var i = 0; i < 10; i )
{
await Task.Delay(5);
}
}
}
I have just seen your requirement that the work package should only be done once at all. To achieve that, you can just skip releasing the semaphore.
Fur further discussions we need to know more details on the actual requirements. Your current set of requirements seems a bit strange. Why have multiple tasks but only one work package?