Home > Net >  Problem Serializing Spring batch Kafka ChunkRequest
Problem Serializing Spring batch Kafka ChunkRequest

Time:02-16

I am writing a sample Spring batch Kafka integration, with Remote Chunking. At first a master read some sample record (Item.java) in some Chunks. Then send this chunks to a (Spring Integration) channel and a Kafka producer send this Chunks to a Kafka topic. The problem is KafkaTemplate cant serialize the ChenkRequest.

if I use :

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

then this error raise:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

and if I write a custom serializer like:

public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, ChunkRequest<Item> data) {
        try {
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

and this config:

spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer

this error would raise:

com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]

So, what is the problem!?

CodePudding user response:

The org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest is not JSON-compatible. Consider to write a Serializer<ChunkRequest<Item>> which would serialize using standard Java serialization feature. You may use a org.springframework.core.serializer.DefaultSerializer.serializeToByteArray() from Spring as a delegate.

Probably you would need a similar deserialzier on the other side...

CodePudding user response:

Thank you Artem. that worked! I wrote a customer serializer:

public class CustomSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            return new DefaultSerializer().serializeToByteArray(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }
}

with config:

spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer

and a custom deserializer:

public class CustomDeserializer implements Deserializer<Object> {
    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}

with config:

spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer
  • Related