Home > Enterprise >  Kafka Streams - Serdes for Custom Objects
Kafka Streams - Serdes for Custom Objects

Time:12-24

I'm pretty new to Kafka Streams and found overall the API is confusing and documentation not easy to follow. I'm writing a simple streams app like this:

  1. Input stream: key(String)->userID, value(String)->a user's transaction record JSON string.

  2. Topology: aggregate the above input and generate a KTable<String, UserAccountBalance>. Key is userId, value is a custom object updated as the aggregation goes.

     final KStream<String, String> transactionsInput = streamsBuilder.stream("bank-balance-input-topic");
     final KTable<String, UserBankBalance> table =
             transactionsInput.groupBy((key, value) -> key)
                              .aggregate(() -> new UserBankBalance("dummyUserId", 0, "1866-12-23T17:47:37Z"),
                                         (userName, transactionRecordStr, userBankBalance) -> {
                                             // Code does the following:
                                             //   1. Deserialize the transactionRecordStr
                                             //   2. Update the UserBankBalance object.
                                             //   return userBankBalance;
                                         });
    

(Default key,value serdes configured to be String) However, when running some sanity tests, I got String serializer not compatible with the UserBankBalance object.

  1. Why are Serdes required for operations like mapValues, groupByKey, aggregate?

    My understanding:

    • Streams library need to materialize the object using the default serdes to update internal state?
    • If repartition happens, the key, value needs to be serialized and stored back to the internal partition for further processing?
    • Given the above, even if we just create the KTable<String,UserBankBalance> as in-memory representation, the Serdes is still required.

    I've read the official doc and API doc, just couldn't find any good clarification.

  2. Why Kafka Stream's library doesn't provide a default ObjectMapperSerdes which utilize the Jackson's ObjectMapper (Like this official example)? I imagine lots of users would have similar use cases and there would be duplicate efforts for the library users to do so.

References:

CodePudding user response:

Why are Serdes required for operations

Kafka stores bytes. Streams API doesn't pass objects from one operation to the other, it uses Kafka as a message bus. It needs to serialize any object to bytes to send over the network.

If you are working with JSON, then Kafka Streams already has a built-in way to create a JSONSerde; there doesn't need to be an ObjectMapper one since you can use Serdes.serdeFrom static method. (Also, it would create a dependency on connect-json module and bloat the kafka-streams classpath).

Alternatively, Spring-Kafka also has JsonSerde. And Confluent maintains AvroSerde, ProtobufSerde, etc. to use with classes generated from those tools.

got String serializer not compatible with the UserBankBalance object

You need to use some combination of Grouped, Materialized, Consumed, or Produced classes with each operation to override the default topology serde.

  • Related