I'm kinda new to RabbitMQ, so please bear with me. I use standard RabbitMQ package: https://www.rabbitmq.com/dotnet.html
When you implement client side Request-Reply pattern, you use same methods as usual. Message consumption:
_consumer.Received = (model, ea) =>
{
var reply = JsonSerializer.Deserialize<TReply>
(
Encoding.UTF8.GetString(ea.Body.ToArray())
);
/* Consume the message */;
};
_channel.BasicConsume
(
queue: _replyQueue.QueueName,
autoAck: true,
consumer: _consumer
);
Message publishment:
var properties = _channel.CreateBasicProperties();
properties.ReplyTo = _replyQueue.QueueName;
properties.CorrelationId = Guid.NewGuid().ToString();
var body = Encoding.UTF8.GetBytes
(
JsonSerializer.Serialize(request)
);
_channel.BasicPublish("", _requestQueue.QueueName, properties, body);
So, message publishment and message consumption are still implemented in an independent way, however, in addition, you have properties.CorrelationId
property which allows you to connect reply to its request on your own. I feel like this is too low-level approach, which shouldn't be used directly on the application level. Instead, this should be wrapped into some higher-level library client. On the application level I want to see something like this:
var client = MyRabbitMQRequestReplyClient();
var reply = await client.RequestAsync(request);
All request-reply CorrelationId-matching, as I think, should be hidden from the application-level developer.
This approach may look kinda unsafe, because we wait in a sequential way until the reply is received insted of doing everything in parallel. But sometimes we really want exactly such a behavior. For example, when a user clicks a button, and the client requests for something on a remote service, and once a reply is received, a user should get this reply right away, and this user is OK to wait for this reply - it feels like a natural behavior for this particular UI.
So, the questions are: is this a good approach for the cases when we really want to wait for the reply? If it's not - then why? What is the better approach? I don't want to mess with CorrelationId-matching every time I do request-reply on the application level. If this approach is good - then what is the best way to implement it?
CodePudding user response:
If you're using something like direct reply-to
, then this would be useful. I'd hesitate to do this approach if the reply queue was persistent or shared, because you want to ensure that only that one client will get the response.
The usual solution is to have a concurrent dictionary mapping from id to TaskCompletionSource<TReply>
. When sending a new request, insert a new item in that dictionary and then return the TCS's Task
property. When replies come in, retrieve the TCS from the dictionary and complete it.