Assume the config, applied both on sending and receiving sides:
@Configuration
@EnableRabbit
public class EventsConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
Assume also, that there is a simple Spring Boot RabbitMQ listener on receiving side, declared like:
@RabbitListener(queues = "${amqp.inbox}")
public void listen(org.springframework.amqp.core.Message message) {
...
}
Attempt to receive a collection of List<TaskAssignment>
(where TaskAssignment
is a simple POJO having two UUIDs), sent by rabbit template, is ended on receiver side with exception:
accounting-service_1 | 01:26:39.610 WARN [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1 | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1 | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1 | at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1 | at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1 | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1 | ... 6 common frames omitted
accounting-service_1 | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1 | at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1 | ... 11 common frames omitted
accounting-service_1 | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1 | at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1 | at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
Vavr module for Jackson is installed in ObjectMapper
, so the root cause is not the fact that the collection is wrong and unserializable.
Notable thing about the stack trace is the headers part:
headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}
My theory is: since listener has received a JSON array, and Jackson requires explicit TypeReference
to be provided in order to deduce a POJO, to which to deserialize array's items, that was the reason failure occurred in Jackson2MessageConverter
. Nobody have given it the contents type.
The question is - how to provide Jackson2MessageConverter
with such info? RabbitTemplate
doesn't seem to give any means. Explicit setup of __ContentTypeId__
on sender side doesn't help either.
Or how to at least overcome Jackson deserialization on receiving side and listen to just raw messages in @RabbitListener
with byte[]
payload?
CodePudding user response:
See if this sample helps you somehow:
https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json
The docs is here: https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter
CodePudding user response:
Setting assumeSupportedContentType
to false
solved the case.
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
var converter = new Jackson2JsonMessageConverter(objectMapper);
converter.setAssumeSupportedContentType(false);
return converter;
}
It allowed me to just receive plain org.springframework.amqp.core.Message
with simple byte[]
payload, and parse it with properly configured ObjectMapper
inside the handler.