I found how to do manual ack with functional way.
However, this does not explain how to do manual ack sending after messages.
If there is a networking error between the ack and the sending of the message, I think the message will be lost.
I don't want to lose message.
Versions
- Spring boot : 2.7.1
- Spring Cloud : 3.2.4
This is my current code
@Controller
class MyFunction(private val myService: MyService) {
private val objectMapper = ObjectMapper()
init {
objectMapper.registerKotlinModule()
}
@Bean
fun function(): (Message<ByteArray>) -> Message<MyResponse> = {
val channel = it.headers.get(AmqpHeaders.CHANNEL, Channel::class.java)!!
val deliveryTag = it.headers.get(AmqpHeaders.DELIVERY_TAG, java.lang.Long::class.java)!!.toLong()
val request = objectMapper.readValue(it.payload, MyRequest::class.java)
try {
val response = myService.process(request)
channel.basicAck(deliveryTag, false)
// I think it can potentially cause message loss
MessageBuilder.withPayload(result)
.build()
} catch (ex: Exception) {
channel.basicNack(deliveryTag, false, false)
throw ex
}
}
}
CodePudding user response:
Use a Consumer
to receive the message and StreamBridge
to send (instead of using Function
.
But why do you need manual acks anyway? Throwing the exception will cause the container to nack the message (and ack with normal exit) with auto mode.