Kafka transaction rollback not working with 3 topics for RecordTooLargeException


I post 3 message to 3 topics - while posting if gets exception - all messages will be rolled back.

But in my case it is not happening when I simulate the below exception for 3rd Topic. org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes

while posting large message to the 3rd topic (price topic)- I programmatically increase the Size of the message to get exception.

Message are send to 1st 2 topic successfully - 3rd one failed. - As per transaction all messages must be rolled back - but topic 1 and 2 all the time gets the message.

But LOG shows - Transaction rolled back

HOW to FIX this issue


2022-03-23 21:16:59.690 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":...' to topic PRICE-TOPIC: 
**org.apache.kafka.common.errors.RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:16:59.733 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:16:59.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:16:59.738 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Transaction rolled back 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

2022-03-23 21:17:00.250 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  c.a.m.r.producer.KafkaProducer - @@@ --- Sending Data to  Item , price, Inventory  ----- 
2022-03-23 21:17:00.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='String' and payload='{"sku":"String","lowestOriginalPrice":"String","lowestPrice":"String","updatedAt":"String","createdA...' to topic PRICE-TOPIC: 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
2022-03-23 21:17:00.295 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-2, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1d0] Aborting incomplete transaction 
2022-03-23 21:17:00.298 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.k.c.producer.KafkaProducer - [Producer clientId=raw-item-producer-client-1, transactionalId=tx-5b01ad71-f754-44e0-9c52-4774a482bc1draw-item-processor-group-id.OSMI_C02_CATALOG_MKPDOMAIN.0] **Aborting incomplete transaction** 
2022-03-23 21:17:00.308 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.**KafkaMessageListenerContainer - Transaction rolled back** 
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.albertsons.mkp.rawitemprocessor.consumer.KafkaConsumer.receive(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>) throws java.io.IOException' threw exception; nested exception is **org.springframework.kafka.KafkaException: Send failed**; nested exception is org.apache.kafka.common.errors.**RecordTooLargeException**: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 117440606 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

Rolled back records remain in the log.

Kafka adds a marker to the log to indicate whether the transaction was committed or rolled back.

By default, consumers will receive all records, even if they are rolled back.

Consumers must be configured with isolation.level=read_committed to avoid seeing rolled back records.


Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return transactional messages which have been committed. If set to read_uncommitted (the default), consumer.poll() will return all messages, even transactional messages which have been aborted. Non-transactional messages will be returned unconditionally in either mode.

Messages will always be returned in offset order. Hence, in read_committed mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction. In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, read_committed consumers will not be able to read up to the high watermark when there are in flight transactions.

When using Spring Boot, it's read-committed, not read_committed.


Your IDE should suggest proper values.




(Although I see that Boot works with read_uncommitted too).

This works as expected for me.

public class So71591355Application {

    public static void main(String[] args) {
        SpringApplication.run(So71591355Application.class, args);

    @KafkaListener(id = "so71591355", topics = "so71591355")
    void listen1(String in) {
        System.out.println("committed: "   in);

    @KafkaListener(id = "so71591355-2", topics = "so71591355",
            properties = "isolation.level:read_uncommitted")
    void listen2(String in) {
        System.out.println("uncommitted: "   in);

    public NewTopic topic() {
        return TopicBuilder.name("so71591355").partitions(1).replicas(1).build();

    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so71591355", "non-transactional");
            try {
                template.executeInTransaction(t -> {
                    t.send("so71591355", "first");
                    t.send("so71591355", "second");
                    t.send("so71591355", new String(new byte[2000000]));
                    return null;
            catch (Exception e) {


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::                (v2.6.4)

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:660)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403)
    at com.example.demo.So71591355Application.lambda$1(So71591355Application.java:49)
    at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:507)
    at com.example.demo.So71591355Application.lambda$0(So71591355Application.java:44)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at com.example.demo.So71591355Application.main(So71591355Application.java:19)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
uncommitted: non-transactional
committed: non-transactional
uncommitted: first
uncommitted: second

MCRE Code sample -

application.yml file

  profiles :
    active : local

  profiles : local
      client-id: raw-item-producer-client
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      transaction-id-prefix: tx-${random.uuid}
        enable.idempotence: true
        acks: all
        retries: 10
      bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
      groupId: raw-item-processor-group-id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      auto-offset-reset: earliest
      isolation-level: read-committed
    name: osmi-rawitemprocessor-dev
# TestPublisher will post a sample data to TEST-TOPIC
# ConsumerService will consume the data and do little modification , then send to 
# If any of the topic causes exception, then data will be reverted back from all 
# 3 topics
    name: TEST-TOPIC


    name: PRICE-TOPIC

    name: ITEM-TOPIC

    name: TEST-TOPIC.DLT

Main application {

package com.test.transaction.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

public class DemoApplication {
    TestPublisher testPublisher;

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(DemoApplication.class, args);

    ApplicationRunner runner() {
        return args -> testPublisher.publishTestMessage();


Sample Publisher - who send message to TEST-TOPIC {

package com.test.transaction.demo;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

public class TestPublisher {

    @Value(value = "${raw.topic.name}")
    private String testTopic;

    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TEST_DATA = "My test Message";

    public void publishTestMessage() {
        ProducerRecord<String, String> pr = new ProducerRecord<String, String>(testTopic, "TestKey-001", TEST_DATA);
                kt-> {
                    return true;


ConsumerService get the message from the TEST-TOPIC and post it to other 3 topics


    package com.test.transaction.demo;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    public class ConsumerService {
        @Value(value = "${inventory.topic.name}")
        private String inventoryTopic;
        @Value(value = "${price.topic.name}")
        private String priceTopic;
        @Value(value = "${item.topic.name}")
        private String itemTopic;
        private ProducerService producerService;
        @KafkaListener(topics = "${raw.topic.name}")       
        public void receive(ConsumerRecord<String, String> cr ) throws Exception {
            log.info("Consumer Service received payload. key :{} , value : {}", cr.key() , cr.value());
        private void splitRawItemAndPost(String consumerData) {
            ProducerRecord<String,String> itemRecord = getItemData(consumerData);
            ProducerRecord<String,String> InventoryRecord = getInventoryData(consumerData);
            //simulated for Very Large Data
            ProducerRecord<String,String> priceRecord = getPriceData(consumerData);
        private ProducerRecord<String, String> getItemData(String consumerData) {
            String data = "Item data : "   consumerData;
            ProducerRecord<String,String> producerRecord= buildProducerRecord(itemTopic,"ItemId-001",data);
            return producerRecord;
        private ProducerRecord<String, String> getInventoryData(String consumerData){
            String data = "Inventory data : "  consumerData;
            ProducerRecord<String,String> producerRecord= buildProducerRecord(inventoryTopic,"InventoryId-001", data);
            return producerRecord;
        private ProducerRecord<String, String> getPriceData(String consumerData) {
            //String data = "This is Price data";
            //ProducerRecord<String,String> producerRecord= buildProducerRecord(priceTopic,"PriceId-001", data);
            String data = new String(new byte[2000000])   consumerData; // Very Large Data
            ProducerRecord<String,String> producerRecord= buildProducerRecord(priceTopic,"PriceId-001", data);
            return producerRecord;
        private ProducerRecord<String, String> buildProducerRecord(String topic, String key,String value) {
            return new ProducerRecord<String, String>(topic,key,value);


ProducerService finnaly send the data to 3 topics (item,inventory,price) using executeInTransaction method

Posting data to price topic is simulated for Very large message (RecordTooLargeException) , log shows also Transaction rolled back , but message is posted to the item, inventory topic successfully. I look the messages via KAFKA-TOOL editor


   package com.test.transaction.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

public class ProducerService {

    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendAllRecords(ProducerRecord<String, String> item, ProducerRecord<String, String> inventory,
                               ProducerRecord<String, String> price){

        log.info("--- Sending Data to  Item , price, Inventory  -----");
        try {
                    kt ->
                        return null;

        }catch (Exception ex){
            throw ex;


Configuration file - Plan to write - retry , handle Recoverable and non recoverable exceptions, save exception data to db and send data to DLQ. Yet to write. Plan to add - AfterRollbackProcessor. Not sure how to do with this Transactional issue.


package com.test.transaction.demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

public class KafkaConfig {
