Home > OS >  Spring Boot RabbitMQ: how to convert received object using convertSendAndReceive() method?
Spring Boot RabbitMQ: how to convert received object using convertSendAndReceive() method?

Time:05-23

How can I deserialize a message using convertSendAndReceive() method? It gives me NullPointerException due to not being able to find the required class for deserialization in another package. Packages are marked in the code.

Listener receives and sends messages normally

package org.dneversky.user;

@EnableRabbit
@Component
public class TestListener {

  private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
  public void doGet(UserReplyMessage message) {
    logger.info("Received message: {}", message);
    UserReplyMessage response = new UserReplyMessage();
    logger.info("Sending message: {}", response);
    rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, 
RabbitMQConfig.REPLY_QUEUE, response);
  }
}

Configuration of the listener

package org.dneversky.user.config;

@Configuration
public class RabbitMQConfig {

  public static final String RECEIVE_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Queue receiveQueue() {
    return new Queue(RECEIVE_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public Binding receiveBinding() {
    return BindingBuilder.bind(receiveQueue()).to(rpcExchange()).with(RECEIVE_QUEUE);
  }

  @Bean
  public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }
}

Sender sends a message normally, but it can't to deserialize returning message

package org.dneversky.gateway.servie.impl;

@Service
public class UserServiceImpl {

  private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  public UserPrincipal getUserByUsername(String username) {
    UserResponse message = new UserResponse(username);
    logger.info("Sending created message: {}", message);
    UserResponse result = (UserResponse)     rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, RabbitMQConfig.RPC_QUEUE, message);
    logger.info("Getting response: {}", result);

    return null;
  }
}

Configuration of the Sender

package org.dneversky.gateway.config;

@Configuration
public class RabbitMQConfig {

  public static final String RPC_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public Queue rpcQueue() {
    return new Queue(RPC_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Binding binding() {
    return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_QUEUE);
  }

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(RPC_EXCHANGE);
    rabbitTemplate.setReplyAddress(REPLY_QUEUE);
    rabbitTemplate.setReplyTimeout(6000);
    rabbitTemplate.setMessageConverter(messageConverter());

    return rabbitTemplate;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(REPLY_QUEUE);
    container.setMessageListener(rabbitTemplate(connectionFactory));
    return container;
  }
}

Error log

2022-05-22 17:12:31.344 ERROR 16920 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [org.dneversky.user.model.UserReplyMessage]] with root cause

java.lang.ClassNotFoundException: org.dneversky.user.model.UserReplyMessage

CodePudding user response:

inside your replyContainer I couldn't see your messageConverter bean. In default it uses java objects to send and receive messages without converting them into human readable json.

@Bean
 public SimpleRabbitListenerContainerFactory customListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                               MessageConverter jsonMessageConverter) {
   SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
   factory.setConnectionFactory(connectionFactory);
   factory.setMessageConverter(jsonMessageConverter);
   return factory;
}

for your consumer;

@RabbitListener(queues = RabbitConstants.YOUR_QUEUE_NAME, containerFactory = "customListenerContainerFactory")
public void onMessage(@Valid YourEvent YourEvent){
 //your code
}

CodePudding user response:

by default, the producer set the _TypeID_ header as the class name used for the serialization of the object

then consumer uses _TypeID_ header to know the class that should use to convert the JSON to java instance

you use two different classes to serialize and deserialize the object and you have to configure the converter

CodePudding user response:

Inside the Listener class, you need to add this line to bind your message converter

    @Bean
public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                              SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
}

Also, in the TestListener class, you should replace this line

@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE) 

with this one

@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE,containerFactory="jsaFactory")
  • Related