Home > Software engineering >  How can i intercept incomig messages before they reach methods annotated with @RabbitListener?
How can i intercept incomig messages before they reach methods annotated with @RabbitListener?

Time:04-26

I Started by setting up an interceptor for outgoing messages which is working smoothly, but when i try to intercept incomming messages in the consumers, the postProcessMessage method is skipped and the message reaches the method annotated with @RabbitListener, bellow is my code for the whole proccess, i ommited unimportant code.

Producer

RabbitMQProducerInterceptor

@Component
@Slf4j
public class RabbitMQProducerInterceptor implements MessagePostProcessor {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {

        log.info("Getting the current HttpServletRequest");
        HttpServletRequest req = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
                .getRequest();

        log.info("Extracting the X-REQUEST-ID from the header of the HttpServletRequest");
        String XRequestId = req.getHeader(ShareableConstants.X_REQUEST_ID_HEADER);

        log.info("Adding X-REQUEST-ID {} to the RabbitMQ Producer Header", XRequestId);
        message.getMessageProperties().getHeaders().put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);

        return message;
    }
}

RabbitMQProducerConfig

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(60000);
        MessagePostProcessor[] processors = {new RabbitMQProducerInterceptor()};
        rabbitTemplate.addBeforePublishPostProcessors(processors);
        return rabbitTemplate;
    }

Sending a message to the consumer

User Producer

    public UserRegistrationResponseDTO register(UserRegistrationDTO userRegistrationDTO) {
        log.info("Sending user registration request {}", userRegistrationDTO);

        UserRegistrationDTO response = (UserRegistrationDTO) rabbitTemplate
                .convertSendAndReceive(ShareableConstants.EXCHANGE_NAME,
                        ShareableConstants.CREATE_USER_ROUTING_KEY,
                        userRegistrationDTO);

        return UserRegistrationResponseDTO.builder()
                .username(response.getUsername())
                .id(response.getId())
                .createdAt(response.getCreatedAt()).build();
    }

Consumer

RabbitMQConsumerConfig

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        MessagePostProcessor[] processors = {new RabbitMQConsumerInterceptor()};
        rabbitTemplate.setAfterReceivePostProcessors(processors);
        return rabbitTemplate;
    }

RabbitMQConsumerInterceptor


@Component
public class RabbitMQConsumerInterceptor implements MessagePostProcessor {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {

        String XRequestId = message.getMessageProperties().getHeader(ShareableConstants.X_REQUEST_ID_HEADER);

        MDC.put(ShareableConstants.X_REQUEST_ID_HEADER, XRequestId);

        return message;
    }
}

User Consumer

@RabbitListener(bindings =
    @QueueBinding(exchange = @Exchange(ShareableConstants.EXCHANGE_NAME),
            key = ShareableConstants.CREATE_USER_ROUTING_KEY,
            value = @Queue(ShareableConstants.USER_REGISTRATION_QUEUE_NAME)))
    public UserRegistrationDTO receiveUser(UserRegistrationDTO userRegistrationDTO) {
        log.info("receiving user {} to register ", userRegistrationDTO);
        User user = Optional.of(userRegistrationDTO).map(User::new).get();
        User createdUser = userService.register(user);

        UserRegistrationDTO registrationDTO = UserRegistrationDTO.builder()
                .id(createdUser.getId())
                .username(createdUser.getUsername())
                .createdAt(createdUser.getCreationDate())
                .build();

        return registrationDTO;
    }

Here's the code, no exception is thrown, the only problem is the Interceptor being skipped

CodePudding user response:

The RabbitTemplate is not used to receive messages for a @RabbitListener; messages are received by a listener container; you have to set the afterReceivePostProcessors on the listener container factory.

If you are using Spring Boot, just add the auto-configured SimpleRabbitListenerContainerFactory as a parameter to one of your other @Beans and set the MPP on it.

  • Related