Home > OS >  RabbitMQ direct exchange is not consuming messages
RabbitMQ direct exchange is not consuming messages

Time:08-31

edit: edited to make it more generalized

I have a producer thread wanting to send a message to the consumer threads in Java using RabbitMQ direct exchange, but my consumer are not receiving the dish. Below is the method my producer uses to publish the message:

Note that key was passed as a parameter and was determined in a previous if-else statement to be either "key1" or "key2" string.

public void producerSend (Object object, String key) throws IOException, TimeoutException {
    
    String exchange = "directExchange";
    
    System.out.println("PRODUCER: Sent message);
    
    ConnectionFactory factory = new ConnectionFactory();
    byte[] byteArray = getByteArray(object);
    try (Connection con = factory.newConnection()){
        Channel chan = con.createChannel();
        chan.exchangeDeclare (exchange, BuiltinExchangeType.DIRECT);
        chan.basicPublish(exchange, key, null, byteArray);
    }
}

Below is the method my consumers use to consume the message:

public void consumerReceive() throws IOException, TimeoutException  {
String exchange = "directExchange";
String key = "key1";
    
    ConnectionFactory factory = new ConnectionFactory();
    try (Connection con = factory.newConnection()) {  // added a try clause 
    Channel chan = con.createChannel();
   chan.exchangeDeclare(exchange, "direct"); // direct exchange
    
    String queueName = chan.queueDeclare().getQueue();  
    chan.queueBind(queueName, exchange, key); 
    
    
    chan.basicConsume(queueName, true, (x, msg)->{
    byte[] byteArray = msg.getBody();
    try {
        Object object = (Object) deserialize(byteArray);
        System.out.println("CONSUMER: Received message. Bye!");
       
        
    } catch (Exception e) {}
    }, x->{}
    );
    }
}

Note that I added a try clause when creating a new connection for the consumer as there were many consumers (threads) that are continuously created to order and obtain a message, and I want them to only connect ONCE with the producer to obtain the message, then stop connection (consumer says bye).

Before, when I did not add the try clause, so the consumers were able to receive the message and leave, but when the next consumer joins and waits for their message, the producer ends up sending the message to ALL of the consumers who had previously received their message because the connection was still open with the previous consumers. That's why I added the try clause, but I'm not sure why after adding it, the consumers no longer receive any dish (the consumer connection is not created?).

I'd really appreciate it if anyone could help me resolve this problem. I only want one unique consumer to receive each message and leave (close the connection with that consumer?)

CodePudding user response:

I think you close connection without waiting message. Nothing stop your code to close all and that's kill consumer thread that is launch.

So you need waiting in a while loop :

private static final volatile boolean NO_LEFT = true;    
    public void tamagoReceiveDish() throws IOException, TimeoutException  {
    String sushiDishEx = "sushiDishExchange";
    String tamagoKey = "tamagoDishKey";
    
    ConnectionFactory factory = new ConnectionFactory();
    try (Connection con = factory.newConnection()) {  // added a try clause 
    Channel chan = con.createChannel();
   chan.exchangeDeclare(sushiDishEx, "direct"); // direct exchange
    
    String queueName = chan.queueDeclare().getQueue();  
    chan.queueBind(queueName, sushiDishEx, tamagoKey); 
    
    
    chan.basicConsume(queueName, true, (x, msg)->{
    byte[] byteArray = msg.getBody();
    try {
        Sushi sushi = (Sushi) deserialize(byteArray);
        System.out.println("CUSTOMER "   this.custNo   " (ordered Tamago Sushi): Received "   sushi.sushiName   ".");
        System.out.println("CUSTOMER "   this.custNo   ": Left the restaurant.");
        NO_LEFT = false;
        
    } catch (Exception e) {}
    }, x->{}
    );
   while(NO_LEFT){
     Thread.sleep(100);
    }
    }}
  • Related