I Started by setting up an interceptor for outgoing messages which is working smoothly, but when i try to intercept incomming messages in the consumers, the postProcessMessage method is skipped and the message reaches the method annotated with @RabbitListener, bellow is my code for the whole proccess, i ommited unimportant code.
Producer
RabbitMQProducerInterceptor
@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
log.info("Getting the current HttpServletRequest");
HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
RabbitMQProducerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(60000);
MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
rabbitTemplate.addBeforePublishPostProcessors(processors);
return rabbitTemplate;
}
Sending a message to the consumer
User Producer
public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
log.info("Sending user registration request {}", userRegistrationDTO);
UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
.convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
ShareableConstants.CREATE_USER_ROUTING_KEY,
userRegistrationDTO);
return UserRegistrationResponseDTO.builder()
.username(response.getUsername())
.id(response.getId())
.createdAt(response.getCreatedAt()).build();
}
Consumer
RabbitMQConsumerConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
rabbitTemplate.setAfterReceivePostProcessors(processors);
return rabbitTemplate;
}
RabbitMQConsumerInterceptor
@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);
MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);
return message;
}
}
User Consumer
@RabbitListener(bindings =
@QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
key = ShareableConstants.CREATE_USER_ROUTING_KEY,
value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
log.info("receiving user {} to register ", userRegistrationDTO);
User user = Optional.of(userRegistrationDTO).map(User::new).get();
User createdUser = userService.register(user);
UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
.id(createdUser.getId())
.username(createdUser.getUsername())
.createdAt(createdUser.getCreationDate())
.build();
return registrationDTO;
}
Here's the code, no exception is thrown, the only problem is the Interceptor being skipped
CodePudding user response:
The RabbitTemplate
is not used to receive messages for a @RabbitListener
; messages are received by a listener container; you have to set the afterReceivePostProcessors
on the listener container factory.
If you are using Spring Boot, just add the auto-configured SimpleRabbitListenerContainerFactory
as a parameter to one of your other @Bean
s and set the MPP on it.