I am writing an integration test for an application that connects to Kafka to consume and publish data and for that purpose I am using EmbeddedKafka. Part of the logic is to consume messages with specific offsets. I want to simulate this, therefore my goal is to:
- send some messages to EmbeddedKafka, but with specific offsets
- consume them with the same offsets
This doesn't work now, i.e. I'm sending messages with KafkaHeaders.OFFSET
, but it's ignored, the message that I'm consuming afterwards has a different offset. In fact the offset just starts with 0 and is then incremented.
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.PARTITION_ID, partition);
.setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());
KafkaTemplate<String, String>
is initialised in a standard way. On the other end I consume in a standard way:
private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {
MessagesWithOffsetsConsumer() {
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> {
String id = record.key();
String dataAssetPayload = record.value();
int partitionId = record.partition();
LOGGER.info("Received record: {} offset: {}", id, record.offset());
}
}
In short, offset of a message received in onMessage
is not the same as set during message building.
Is there any way to achieve this?
CodePudding user response:
No, there is no something like an explicit offset value on the ProducerRecord
.
You only can set these props:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
And that's really what is done from the KafkaTemplate
logic. The KafkaHeaders.OFFSET
is a consumer side property:
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final Headers headers;
private final K key;
private final V value;
In other words: Apache Kafka is not designed for an explicit offset setting.