I am writing a sample Spring batch Kafka integration, with Remote Chunking. At first a master read some sample record (Item.java) in some Chunks. Then send this chunks to a (Spring Integration) channel and a Kafka producer send this Chunks to a Kafka topic. The problem is KafkaTemplate cant serialize the ChenkRequest.
if I use :
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
then this error raise:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
and if I write a custom serializer like:
public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, ChunkRequest<Item> data) {
try {
System.out.println("Serializing...");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
@Override
public void close() {
}
}
and this config:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
this error would raise:
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]
So, what is the problem!?
CodePudding user response:
The org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest
is not JSON-compatible. Consider to write a Serializer<ChunkRequest<Item>>
which would serialize using standard Java serialization feature. You may use a org.springframework.core.serializer.DefaultSerializer.serializeToByteArray()
from Spring as a delegate.
Probably you would need a similar deserialzier on the other side...
CodePudding user response:
Thank you Artem. that worked! I wrote a customer serializer:
public class CustomSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic, Object data) {
try {
return new DefaultSerializer().serializeToByteArray(data);
} catch (Exception e) {
throw new SerializationException("Error when serializing MessageDto to byte[]");
}
}
}
with config:
spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer
and a custom deserializer:
public class CustomDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String s, byte[] bytes) {
try {
return new DefaultDeserializer().deserialize(new ByteArrayInputStream(bytes));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
with config:
spring.kafka.consumer.value-deserializer=com.example.myremotechunking.batch.CustomDeserializer