I have the need to inspect a dead letter queue, and if some condition exists (like older than 30 days) I want to archive it to some data store (not just remove it). So I was gonna grab the messages, if it meets this condition, save it to some store and complete/delete the message, if not, abandoning it. I have a console app where I’m grabbing the messages from the dlq, it seems to work, but if I run it over and over again, I’m seeing inconsistent results in the number of messages that get returned. It will have all of them for a few iterations (in my example it would be 7), but then it will start only getting 6, 0 or 1, and eventually go back to the full amount that’s in the dql (like 30 seconds later which I think is the default lock period for peek lock). I would assume that every time I run this, I should get all messages, cause I’m abandoned the messages the run before.
I'm using Azure.Messaging.ServiceBus 7.8.1 and seems like you just pass the message object to the abandon method. If anyone has any suggestion that would be great!
Code in github: https://github.com/ndn2323/bustest
using Azure.Messaging.ServiceBus;
using System.Text;
namespace BusReceiver
{
public class TaskRunner
{
public TaskRunner() { }
public async Task Run() {
const string DLQPATH = "/$deadletterqueue";
var maxMsgCount = 50;
var connectionString = "[ConnectionString]";
var topicName = "testtopic1";
var subscriberName = "testsub1";
var subscriberDlqName = subscriberName DLQPATH;
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusReceiverOptions();
options.ReceiveMode = ServiceBusReceiveMode.PeekLock;
var receiver = client.CreateReceiver(topicName, subscriberName, options);
var receiverDlq = client.CreateReceiver(topicName, subscriberDlqName, options);
Log("Starting receive from regular queue");
var msgList = await receiver.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgList.Count.ToString() " messages found");
foreach (var msg in msgList)
{
await receiver.DeadLetterMessageAsync(msg);
}
Log("Starting receive from dead letter queue");
var msgListDlq = await receiverDlq.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgListDlq.Count.ToString() " messages found in dlq");
foreach (var msg in msgListDlq) {
Log("MessageId: " msg.MessageId " Body: " Encoding.ASCII.GetString(msg.Body));
// if some condition, archieve message to some data store, else abandon it to be picked up again
// for this test I'm abandoning all messages
await receiverDlq.AbandonMessageAsync(msg);
}
await receiver.CloseAsync();
await receiverDlq.CloseAsync();
}
private void Log(string msg) {
Console.WriteLine(DateTime.Now.ToString() ": " msg);
}
}
}
Example of output:
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:36 PM: Starting receive from regular queue
5/29/2022 11:45:37 PM: 0 messages found
5/29/2022 11:45:37 PM: Starting receive from dead letter queue
5/29/2022 11:45:37 PM: 7 messages found in dlq
5/29/2022 11:45:37 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:42 PM: Starting receive from regular queue
5/29/2022 11:45:43 PM: 0 messages found
5/29/2022 11:45:43 PM: Starting receive from dead letter queue
5/29/2022 11:45:43 PM: 7 messages found in dlq
5/29/2022 11:45:43 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:45:48 PM: Starting receive from regular queue
5/29/2022 11:45:49 PM: 0 messages found
5/29/2022 11:45:49 PM: Starting receive from dead letter queue
5/29/2022 11:45:49 PM: 1 messages found in dlq
5/29/2022 11:45:49 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:\GitHub\ndn2323\bustest\BusReceiver\bin\Debug\net6.0>BusReceiver.exe
5/29/2022 11:46:03 PM: Starting receive from regular queue
5/29/2022 11:46:04 PM: 0 messages found
5/29/2022 11:46:04 PM: Starting receive from dead letter queue
5/29/2022 11:46:04 PM: 1 messages found in dlq
5/29/2022 11:46:04 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
CodePudding user response:
Due to variations in the network, service, and your application, it is normal to see batches of inconsistent size returned when calling ReceiveMessagesAsync
.
When receiving, there is no minimum batch size. The receiver will add enough credits to the link to allow maxMessageCount
to flow from the service but will not wait in an attempt to build a batch of that size. Once any messages have been transferred from the service, they will be returned as the batch. Because you specified a maxWaitTime
, if no messages were available on the service within that time, an empty batch will be returned.
Setting a PrefetchCount in your ServiceBusReceiverOptions
can help to smooth out the batch sizes. That said, it is important to be aware that locks are held for messages in the prefetch queue and are not automatically renewed, so finding a prefetch count too high will result in seeing expired locks.
In your example, the best approach may be to just perform your receive loop repeatedly until you see 1 (or more?) empty batches consecutively. That would be a strong indicator that the queue was empty.