Home > Net >  Send / Receive message through kafka using Apache Avro but with different schema
Send / Receive message through kafka using Apache Avro but with different schema

Time:12-24

I'm new with Apache Avro. Let me describe the problem. I'm trying to send some message using Apache Kafka from a producer application to a consumer application. Message schemas are not the same.

Producer schema (User.avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    }
  ]
}

Consumer schema (User.avsc):

{
  "name": "User",
  "namespace": "avro",
  "type": "record",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": "int"
    },
    {
      "name": "favorite_color",
      "type": "string",
      "default": "green"
    }
  ]
}

Classes:

public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {

    @Override
    public byte[] serialize(String topic, T data) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            if (data != null) {
                BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
                DatumWriter<T> datumWriter = new SpecificDatumWriter<>(data.getSchema());
                datumWriter.write(data, binaryEncoder);
                binaryEncoder.flush();
                return byteArrayOutputStream.toByteArray();
            }
        } catch (Exception e) {
            throw new RuntimeException("An exception occurred during serialization", e);
        }
        return null;
    }
}
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {

    protected final Class<T> targetType;

    public AvroDeserializer(Class<T> targetType) {
        this.targetType = targetType;
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        try {
            if (bytes != null) {
                DatumReader<T> datumReader =
                        new SpecificDatumReader<>(targetType.getDeclaredConstructor().newInstance().getSchema());
                Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
                return datumReader.read(null, decoder);
            }
        } catch (Exception e) {
            throw new RuntimeException("An exception occurred during deserialization", e);
        }
        return null;
    }
}
public class UserProducer {

    public static void main(String[] args) {
        UserProducer userProducer = new UserProducer();
        userProducer.writeUser();
    }

    public void writeUser() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);

        Producer<String, SpecificRecord> recordProducer = new KafkaProducer<>(properties);

        User user = User.newBuilder()
                .setName("Bob")
                .setFavoriteNumber(666)
                .build();

        ProducerRecord<String, SpecificRecord> record = new ProducerRecord<>("avro.User", null, user);
        recordProducer.send(record);
        recordProducer.flush();
        recordProducer.close();
    }
}
public class Consumer {

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.readMessages();
    }

    public void readMessages() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "specific-record-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties,
                new StringDeserializer(), new AvroDeserializer<>(User.class));

        consumer.subscribe(Collections.singletonList("avro.User"));

        while (true) {
            consumer.poll(Duration.ofMillis(100)).forEach(System.out::println);
            consumer.commitAsync();
        }
    }
}

Of course having the same schema it works fine. The problem is with schema evolving. On the receiver side there a new field with default value that should be set but ... I get an exception:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avro.User-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.RuntimeException: An exception occurred during deserialization
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:28)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:10)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1306)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1537)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1373)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:679)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at consumer.Consumer.readMessages(Consumer.java:34)
    at consumer.Consumer.main(Consumer.java:18)
Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
    at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:181)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:279)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:298)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:220)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:456)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at serializer.AvroDeserializer.deserialize(AvroDeserializer.java:25)
    ... 13 more

pom.xml in both applications is more less the same

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-avro-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-compiler</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.9.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.9.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

What am I doing wrong?

I tried with equal schemas and it works but I cannot figure out why receiver side does not handle the lack of optional field.

CodePudding user response:

Avro needs the reader and writer schema when deserializing binary. This can be done with BinaryMessageDecoder.addSchema

Here's a simple unit test that shows the concept.

@Test
void avroRoundTrip() throws IOException {
    User u = User.newBuilder()
        .setName("foobar")
        .setFavoriteNumber(0)
        .build();
    ByteBuffer bb = u.toByteBuffer();

    ColoredUser.getDecoder().addSchema(User.SCHEMA$);
    ColoredUser cu = ColoredUser.fromByteBuffer(bb);
    System.out.println(cu);
    // {"name": "foobar", "favorite_number": 0, "favorite_color": "green"}
}

You already know what types you have at runtime, so just make specific deserializers (e.g. implements Deserializer<ColoredUser>, and don't try to use generics unless you're trying to make some shared library.

CodePudding user response:

If you have to handle evolving schema it is a little bit tricky. Refer the question here. One of the ways you can probably handle that is by adding the schema to the header and using the schema from header while deserializing.

But the problem with that approach is you are still sending a lot of data which beats the purpose of using something like Avro. Avro is used to reduce the size of data being transmitted.

The best way to handle this is to use Confluent schema registry. It is open source. You can run a docker container in your local if you want to use it and configure it with your application.

CodePudding user response:

But this works (using files)

public class Seriali {
    public static void main(String[] args) throws IOException {
        Schema schema = new Schema.Parser().parse(new File("D:/UserOld.avsc"));
        User user = User.newBuilder().setName("John").setFavoriteNumber(66).build();
        DatumWriter<User> datumWriter = new SpecificDatumWriter<>();
        DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, new File("D:/User.txt"));
        dataFileWriter.append(user);
        dataFileWriter.close();
    }
}
public class Deseriali {
    public static void main(String[] args) throws Exception{
        Schema schema = new Schema.Parser().parse(new File("D:/UserNew.avsc"));
        DatumReader<User> datumReader = new SpecificDatumReader<>(schema);
        DataFileReader<User> dataFileReader = new DataFileReader<>(
                new File("D:/User.txt"), datumReader);
        User emp = null;
        while (dataFileReader.hasNext()) {
            emp = dataFileReader.next(emp);
            System.out.println(emp);
        }
    }
}
  • Related