Home > Mobile >  How to notify failure in producer-consumer pattern using BlockingCollection?
How to notify failure in producer-consumer pattern using BlockingCollection?

Time:03-20

I'm trying to create a lifetime process that batches incoming messages for DB bulk insert. The new message is coming in 1 at a time, in an irregular interval. My solution to this would be something like the producer-consumer pattern using BlockingCollection. Messages are added freely into the BlockingCollection by various events and are taken out of BlockingCollection in bulk for DB insert in regular intervals, 5 seconds.

However, the current solution is fire-and-forget. If the bulk insert failed for any reason, I need a way for the processor to notify the original sources of the failure, because the source contains the logic to recover and retry.

Is there a specific pattern I should be using for what I'm trying to achieve? Any suggestion or help is much appreciated!

        private BlockingCollection<Message> _messageCollection;

        public async Task<bool> InsertMessage(Message message)
        {
            if (!_messageCollection.TryAdd(message)) return false;

            // TODO: check message has been successfully processed, if not return false
            // return false;

            return true;
        }

        private void BulkInsertProcess()
        {
            Task consumerThread = Task.Factory.StartNew(async () =>
            {
                while (!_messageCollection.IsCompleted)
                {
                    List<Message> messages = new List<Message>();

                    for (int i = 0; i < 50; i  )
                    {
                        if (_messageCollection.Any())
                        {
                            messages.Add(_messageCollection.Take());
                        }
                        else
                        {
                            break;
                        }
                    }

                    bool insertResult = await _database.BulkInsertMessages(messages);

                    // TODO: check result and inform the consumer if insert failed

                    await Task.Delay(5000);
                }
            });
        }

CodePudding user response:

You will have to associate somehow each Message with a dedicated TaskCompletionSource<bool>. You might want to make the second a property of the first:

public class Message
{
    public TaskCompletionSource<bool> TCS { get; } = new();
}

...or make the first a property of the second:

private class Entry : TaskCompletionSource<bool>
{
    public Message Message { get; init; }
}

...or create a custom class that contains both, or use a ValueTuple<Message, TaskCompletionSource<bool>> as I've chosen in the example below:

private BlockingCollection<(Message, TaskCompletionSource<bool>)> _queue;

public Task<bool> InsertMessage(Message message)
{
    var tcs = new TaskCompletionSource<bool>(
        TaskCreationOptions.RunContinuationsAsynchronously);
    if (!_queue.TryAdd((message, tcs)))
        return Task.FromResult(false);
    return tcs.Task;
}

private void BulkInsertProcess()
{
    Task consumerTask = Task.Run(async () =>
    {
        while (!_queue.IsCompleted)
        {
            var delayTask = Task.Delay(5000);

            var batch = new List<(Message, TaskCompletionSource<bool>)>();
            while (batch.Count < 50 && _queue.TryTake(out var entry))
                batch.Add(entry);

            if (batch.Count > 0)
            {
                var messages = batch.Select(e => e.Item1).ToList();
                bool insertResult = await _database.BulkInsertMessages(messages);

                foreach (var (message, tcs) in batch)
                    tcs.SetResult(insertResult);
            }
            await delayTask;
        }
    });
}

I made some improvements to your code, to make it work more smoothly:

  1. Task.Run instead of Task.Factory.StartNew. The former understands async delegates. The later doesn't.
  2. TryTake instead of Any. The Any is an extension method on the IEnumerable<T> interface, and these are not guaranteed to be thread-safe. Most probably it's thread-safe, but using a public member of the BlockingCollection<T> class is safer and more efficient.
  3. Create the Task.Delay before doing the bulk insert operation, and await it afterwards. This way you get a stable interval between subsequent bulk insert operations, that doesn't depend on the duration of the operations themselves.

In case you got 50 messages in one batch you might consider skipping the await delayTask altogether, because this indicates that your service is under pressure, and the messages are piling up in the queue.

  • Related