Home > Net >  Manual ack after message send with Spring Cloud Stream RabbitMQ (Functional)
Manual ack after message send with Spring Cloud Stream RabbitMQ (Functional)

Time:09-07

I found how to do manual ack with functional way.

However, this does not explain how to do manual ack sending after messages.
If there is a networking error between the ack and the sending of the message, I think the message will be lost.
I don't want to lose message.

Versions

  • Spring boot : 2.7.1
  • Spring Cloud : 3.2.4

This is my current code

@Controller
class MyFunction(private val myService: MyService) {
    private val objectMapper = ObjectMapper()

    init {
        objectMapper.registerKotlinModule()
    }
    
    @Bean
    fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
        val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
        val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()
        
        val request = objectMapper.readValue(it.payload, MyRequest::class.java)
        
        try {
            val response = myService.process(request)
            
            channel.basicAck(deliveryTag, false)
            // I think it can potentially cause message loss
            MessageBuilder.withPayload(result)
                .build()
        } catch (ex: Exception) {
            channel.basicNack(deliveryTag, false, false)
            throw ex
        }
    }
}

CodePudding user response:

Use a Consumer to receive the message and StreamBridge to send (instead of using Function.

But why do you need manual acks anyway? Throwing the exception will cause the container to nack the message (and ack with normal exit) with auto mode.

  • Related