Home > Mobile >  Sending-receiving collection over RabbitMQ in Spring Boot
Sending-receiving collection over RabbitMQ in Spring Boot

Time:11-12

Assume the config, applied both on sending and receiving sides:

@Configuration
@EnableRabbit
public class EventsConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        final var rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
        return rabbitTemplate;
    }
}

Assume also, that there is a simple Spring Boot RabbitMQ listener on receiving side, declared like:

    @RabbitListener(queues = "${amqp.inbox}")
    public void listen(org.springframework.amqp.core.Message message) {
        ...
    }

Attempt to receive a collection of List<TaskAssignment> (where TaskAssignment is a simple POJO having two UUIDs), sent by rabbit template, is ended on receiver side with exception:

accounting-service_1  | 01:26:39.610 WARN  [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1  | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1  | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1  |     at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1  | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1  |     ... 6 common frames omitted
accounting-service_1  | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1  |     at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1  |     ... 11 common frames omitted
accounting-service_1  | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1  |  at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1  |     at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)

Vavr module for Jackson is installed in ObjectMapper, so the root cause is not the fact that the collection is wrong and unserializable.

Notable thing about the stack trace is the headers part:

headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}

My theory is: since listener has received a JSON array, and Jackson requires explicit TypeReference to be provided in order to deduce a POJO, to which to deserialize array's items, that was the reason failure occurred in Jackson2MessageConverter. Nobody have given it the contents type.

The question is - how to provide Jackson2MessageConverter with such info? RabbitTemplate doesn't seem to give any means. Explicit setup of __ContentTypeId__ on sender side doesn't help either.

Or how to at least overcome Jackson deserialization on receiving side and listen to just raw messages in @RabbitListener with byte[] payload?

CodePudding user response:

See if this sample helps you somehow:

https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json

The docs is here: https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter

CodePudding user response:

Setting assumeSupportedContentType to false solved the case.

    @Bean
    public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
        var converter = new Jackson2JsonMessageConverter(objectMapper);
        converter.setAssumeSupportedContentType(false);
        return converter;
    }

It allowed me to just receive plain org.springframework.amqp.core.Message with simple byte[] payload, and parse it with properly configured ObjectMapper inside the handler.

  • Related