I'm having some hard times trying to understand a codebase written in Kotlin using spring boot webflux and reading messages from a kafka topic using spring-kafka.
The signature of the controller in the following:
import org.springframework.kafka.annotation.KafkaListener
import reactor.core.Disposable
interface CreationEventConsumer {
@KafkaListener(
topics = ["\${topic.east}", "\${topic.west}"],
containerFactory = "creationFactory"
)
fun readEvent(record: String): Disposable
}
As you can see in the signature, the method readEvent is returning a Disposable.
So I don't understand why the function has to return something considering that is just a one way flow, does spring do something with that? Is there any benefits?
The implementation of the method is the following:
override fun readEvent(record: String): Disposable {
logger.info("$INCOMING")
return creationEventService.process(record)
.doOnError {
logger.error("error")
}.retry(3)
.subscribe()
}
The creationEventService.process(record) call is returning a Flux
I also have more questions because there is a special API that adds support for reactive programming for kafka but this projects is using spring-kafka.
Thanks!
CodePudding user response:
Makes no sense to me either; Spring knows nothing about a Disposable
.
Returning a value (of any kind) from a @KafkaListener
will do nothing without a @SendTo
annotation - in that case, the "reply" is sent to the replyTo header. If there is no @SendTo
, the returned object is ignored (with a DEBUG log).
In this case, the record's offset will be committed, regardless of the success/failure of the async operation.
You are correct; reactor-kafka should be used for reactive workloads, not Spring for Apache Kafka.