I have a simple question but I can't find an answer for that:
In a RabbitMQ queue, with multiple consumers and a single publisher, is it possible for the publisher to stop sending messages to the other consumers until the first one process the message and send the ack to the publisher?
What I want to achieve:
Queue -> Q1
Sender -> S1
Consumers -> C1 - C2
- C1 and C2 connect to Q1
- S1 send multiple messages to Q1
- C1 gets the message and do some works
- After let's say 10 seconds, C1 send ACK to Q1
- Q1 send the queued message to C2 and the cycle continue
No matter what I tried (single active consumer | reject and nack instead of ack | BasicGet instead of BasicConsume), it doesn't seem to do what I have in mind.
Am I trying to achieve something that is impossible for RabbitMQ or I'm doing something wrong?
Code of the sender:
public static ConnectionFactory _FACTORY = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost" };
public static IDictionary<string, object> _ARGUMENTS = new Dictionary<string, object>();
public static void Main(string[] args)
{
IConnection connection = _FACTORY.CreateConnection();
IModel channel = connection.CreateModel();
SendMessagesQueue(channel);
channel.Close();
connection.Close();
}
public static void SendMessagesQueue(IModel channel)
{
Console.WriteLine("QUEUE");
int i = 0;
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicQos(0,2,true);
while (Console.ReadLine() != null)
{
i ;
var message = DateTime.Now " - " i;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "queue", properties, body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Code of the receivers (they are the same):
public static ConnectionFactory _FACTORY = new ConnectionFactory() { HostName = "localhost" };
public static IDictionary<string, object> _ARGUMENTS = new Dictionary<string, object>();
public static int id = 0;
public static IConnection connection = null;
public static async Task Main(string[] args)
{
IConnection connection = _FACTORY.CreateConnection();
IModel channel = connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
ReceiveMessagesQueue(channel, consumer);
channel.Close();
connection.Close();
}
public static void ReceiveMessagesQueue(IModel channel, EventingBasicConsumer consumer)
{
Console.WriteLine("QUEUE");
while (Console.ReadLine() != null)
{
Console.WriteLine(" [*] Waiting for messages.");
consumer.Received = (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("RECEIVED " message " - SLOW - AT " DateTime.Now);
Thread.Sleep(200000000);//remove it for the fast ack
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine("ACK SENT " DateTime.Now);
Console.WriteLine("\n");
};
channel.BasicGet("queue", false);
};
}
CodePudding user response:
is it possible for the publisher to stop sending messages to the other consumers until the first one process the message and send the ack to the publisher?
Yes. Here is one way:
- Declare three queues: q1, q1-reply, q2
- Consumer C1 consumes from q1, other consumers consume from q2
- Publisher S1 consumes from queue q1-reply
- Publisher S1 publishes a message (with publisher confirmation!) to q1
- Publisher waits for a message from q1-reply
- Consumer C1 does its work, when done, it publishes a message to q1-reply indicating that
- Publisher S1 gets the "done" message, and publishes to q2
- Consumer C2 sees the message on q2, and does its work
If you need further assistance, provide a git repository with your complete code and open a discussion here where we can continue work on this:
https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions
NOTE: the RabbitMQ team monitors the rabbitmq-users
mailing list and only sometimes answers questions on StackOverflow.