I use spring cloud stream with kafka. I have a topic X
, with partition Y
and consumer group Z
. Spring boot starter parent 2.7.2
, spring kafka version 2.8.8
:
@StreamListener("input-channel-name")
public void processMessage(final DomainObject domainObject) {
// some processing
}
It works fine.
I would like to have an endpoint in the app, that allows me to re-read/re-process (seek
right?) all the messages in X.Y
(again). But not after rebalancing (ConsumerSeekAware#onPartitionsAssigned
) or after app restart (KafkaConsumerProperties#resetOffsets
) but on demand like this:
@RestController
@Slf4j
@RequiredArgsConstructor
public class SeekController {
@GetMapping
public void seekToBeginningForDomainObject() {
/**
* seekToBeginning for X, Y, input-channel-name
*/
}
}
I just can't achieve that. Is it even possible ?. I understand that I have to do that on the consumer level, probably the one that is created after @StreamListener("input-channel-name")
subscription, right ? but I've no clue how to obtain that consumer. How can I execute seek on demand to make kafka send the messages to the consumer again ? I just want to reset the offset for X.Y.Z
to 0
to just make the app, load and process all the messages again.
CodePudding user response:
KafkaBindingRebalanceListener.onPartitionsAssigned()
provides a boolean to indicate whether this is an initial assignment Vs. a rebalance assignment.
Spring cloud stream does not currently support arbitrary seeks at runtime, even though the underlying KafkaMessageDrivenChannelAdapter
does support getting access to a ConsumerSeekCallback
(which allows arbitrary seeks between polls). It would need an enhancement to the binder to allow access to this code.
It is possible, though, to consume idle container events in an event listener; the event contains the consumer, so you could do arbitrary seeks under those conditions.