I made some testing with an .NET 6.0 API regarding writing messages to the Azure service bus (topic).
In my testing scenario my .NET 6.0 API receives 10 requests at once.
Each request should write a message with a unique identifier to the Azure service bus.
To make my API respond faster, I started with a very naive approach and created an additional task within each call to perform each operation like this:
message.Name = {SOME UNIQUE ID OF MY CALL}
var task = new Task(async () =>
{
var messageObject = new { message.Name, message.Body };
var messageJson = JsonConvert.SerializeObject(messageObject);
ServiceBusSender sender = _serviceBusClient.CreateSender(_topicName);
await sender.SendMessageAsync(new ServiceBusMessage(messageJson));
});
task.Start();
Now the user does not have to wait for the message to be sent, but directly receives a posivite response even before the message has successfully been put on the bus.
What I expected:
I expected that only a subset or none of my 10 messages will be sent successfully to the bus, because after the request has finished, all objects (including my inline-task) should be destroyed.
Instead I noticed that all 10 messages are successfully sent to the bus. But with double unique identifiers.
So if I have those unique ids:
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
The bus ends up receiving always ten messages, but some uids are double, others are not sent at all, like:
0, 0, 0, 1, 4, 5, 6, 6, 9, 9
If all calls are not sent concurrently, but sequencelly I get:
0, 1, 2, 3, 4, 5, 6, 7, 8, 9 (what makes sense)
I have two concrete questions now:
Why are always all messages transferred to the bus, even though the main Task of the request ends before the inner task ends?
Why are some messages double? Others are not transferred at all?
Any ideas?
By the way: In my production scenario I am not doing it like this. I am using a "BackgroundService" to perform background jobs like that.
CodePudding user response:
When you call Task.Start
your task is not necessarily executed immediately, it is scheduled and will run at some non-deterministic point in the future. When it executes, it will capture the current value of message.Name
, not the value that existed at the time you created the task.
To capture the current message
instance at the time you create the task, you'd want to use a form similar to:
message.Name = {SOME UNIQUE ID OF MY CALL}
var task = Task.Factory.StartNew(async state =>
{
var messageParam = (MessgeType)state;
var messageObject = new { messageParam.Name, messageParam.Body };
var messageJson = JsonConvert.SerializeObject(messageObject);
await using var sender = _serviceBusClient.CreateSender(_topicName);
await sender.SendMessageAsync(new ServiceBusMessage(messageJson));
}, message);
Why are always all messages transferred to the bus, even though the main Task of the request ends before the inner task ends?
When you call Task.Start
, you're queueing the task for execution, where the queue is owned by constructs related to the thread pool. These are not bound to the lifespan of your code, they're part of the runtime. Your task
variable is no longer in scope, but the instance that it references is still held by the runtime.
It does leave you in a position where exceptions go unhandled and unobserved. Depending on when they occur, the severity, and the host environment, in some extreme cases the exception may crash the host process and/or impact the throughput of the thread pool and you'd have little understanding of why.
ServiceBusSender sender = _serviceBusClient.CreateSender(_topicName);
This line is also problematic. You're creating a new sender for each operation to the same topic, which is going to perform poorly and increase resource use. The ServiceBusSender
is intended to be long-lived and reused. I'd strongly advise caching and reusing your sender whenever publishing to _topicName
.
You are also not closing or disposing the sender. This is going to leave its AMQP link open and consuming network resources unnecessarily until it idles out. Even then, you'll have pending clean-up waiting for the finalizer.