I have a worker service which will perform image processing using Cv2 Dnn. This is optimized to use all available CPU, and running multiple instances of it kills performance. While it runs very fast, if two images are added at the same time it can cause Dnn to crash.
For those reason I would like to have a single background worker which reads from a queue sequentially. All incoming requests will add their images to this queue and wait for the result.
The workflow I am hoping to achieve:
- Receive images from the API controller, push the image to a queue and wait
- Background worker reads from the queue sequentially, processes the image, returns the result
- Controller receives the processing result and returns the image
So far I have seen many ways to push an item to a queue to be processed, basically as a fire-n-forget, but nothing on how to wait for the item to finish processing. For example in Microsoft's documentation on Background tasks with hosted services, their example of a queued background task is simply push a number of tasks to the queue, then a service to read tasks as they are added, but the calling code does not wait for the Task to finish
CodePudding user response:
You can use TaskCompetitionSouce for that purpose. Consider this simple background service with single background thread:
using System;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;
public class BackgroundService
{
BlockingCollection<ImageQueueItem> _imagesToProcess = new();
public BackgroundService()
{
var thread = new Thread(ProcessImagesThread);
thread.Start();
}
public Task<ImageProcessingResult> ProcessImageQueuedAsync(Image image)
{
var taskCompetitionSource = new TaskCompletionSource<ImageProcessingResult>();
var queueItem = new ImageQueueItem
{
ImageToProcess = image,
ProcessingResult = new TaskCompletionSource<ImageProcessingResult>(),
};
_imagesToProcess.Add(queueItem);
return taskCompetitionSource.Task;
}
// background thread that processes images
private void ProcessImagesThread()
{
while(true)
{
var queueItem =_imagesToProcess.Take();
var image = queueItem.ImageToProcess;
var result = new ImageProcessingResult();
try
{
// .............................................................
// process image here and set some properties to result variable
// .............................................................
// complete task by setting result
queueItem.ProcessingResult.SetResult(result);
}
catch(Exception e)
{
// or fail task if processing failed
queueItem.ProcessingResult.SetException(e);
}
}
}
}
// Some data about image to process
public class Image{}
// Some data to return from controller after processing image
public class ImageProcessingResult{}
public class ImageQueueItem
{
public Image ImageToProcess { get; set; }
public TaskCompletionSource<ImageProcessingResult> ProcessingResult { get; set; }
}
This service can be used like this:
// awaiting this will wait until Image is processed
var result = await backgroundService.ProcessImageQueuedAsync()