How can I deserialize a message using convertSendAndReceive() method? It gives me NullPointerException
due to not being able to find the required class for deserialization in another package. Packages are marked in the code.
Listener receives and sends messages normally
package org.dneversky.user;
@EnableRabbit
@Component
public class TestListener {
private static final Logger logger = LoggerFactory.getLogger(TestListener.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
public void doGet(UserReplyMessage message) {
logger.info("Received message: {}", message);
UserReplyMessage response = new UserReplyMessage();
logger.info("Sending message: {}", response);
rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE,
RabbitMQConfig.REPLY_QUEUE, response);
}
}
Configuration of the listener
package org.dneversky.user.config;
@Configuration
public class RabbitMQConfig {
public static final String RECEIVE_QUEUE = "rpc_queue";
public static final String REPLY_QUEUE = "reply_queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public TopicExchange rpcExchange() {
return new TopicExchange(RPC_EXCHANGE);
}
@Bean
public Queue receiveQueue() {
return new Queue(RECEIVE_QUEUE);
}
@Bean
public Queue replyQueue() {
return new Queue(REPLY_QUEUE);
}
@Bean
public Binding receiveBinding() {
return BindingBuilder.bind(receiveQueue()).to(rpcExchange()).with(RECEIVE_QUEUE);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Sender sends a message normally, but it can't to deserialize returning message
package org.dneversky.gateway.servie.impl;
@Service
public class UserServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public UserPrincipal getUserByUsername(String username) {
UserResponse message = new UserResponse(username);
logger.info("Sending created message: {}", message);
UserResponse result = (UserResponse) rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, RabbitMQConfig.RPC_QUEUE, message);
logger.info("Getting response: {}", result);
return null;
}
}
Configuration of the Sender
package org.dneversky.gateway.config;
@Configuration
public class RabbitMQConfig {
public static final String RPC_QUEUE = "rpc_queue";
public static final String REPLY_QUEUE = "reply_queue";
public static final String RPC_EXCHANGE = "rpc_exchange";
@Bean
public Queue rpcQueue() {
return new Queue(RPC_QUEUE);
}
@Bean
public Queue replyQueue() {
return new Queue(REPLY_QUEUE);
}
@Bean
public TopicExchange rpcExchange() {
return new TopicExchange(RPC_EXCHANGE);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(RPC_EXCHANGE);
rabbitTemplate.setReplyAddress(REPLY_QUEUE);
rabbitTemplate.setReplyTimeout(6000);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(REPLY_QUEUE);
container.setMessageListener(rabbitTemplate(connectionFactory));
return container;
}
}
Error log
2022-05-22 17:12:31.344 ERROR 16920 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [org.dneversky.user.model.UserReplyMessage]] with root cause
java.lang.ClassNotFoundException: org.dneversky.user.model.UserReplyMessage
CodePudding user response:
inside your replyContainer
I couldn't see your messageConverter
bean. In default it uses java objects to send and receive messages without converting them into human readable json.
@Bean
public SimpleRabbitListenerContainerFactory customListenerContainerFactory(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
return factory;
}
for your consumer;
@RabbitListener(queues = RabbitConstants.YOUR_QUEUE_NAME, containerFactory = "customListenerContainerFactory")
public void onMessage(@Valid YourEvent YourEvent){
//your code
}
CodePudding user response:
by default, the producer set the _TypeID_
header as the class name used for the serialization of the object
then consumer uses _TypeID_
header to know the class that should use to convert the JSON to java instance
you use two different classes to serialize and deserialize the object and you have to configure the converter
CodePudding user response:
Inside the Listener class, you need to add this line to bind your message converter
@Bean
public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
Also, in the TestListener class, you should replace this line
@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
with this one
@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE,containerFactory="jsaFactory")