I'm pretty new to Kafka Streams and found overall the API is confusing and documentation not easy to follow. I'm writing a simple streams app like this:
Input stream: key(String)->userID, value(String)->a user's transaction record JSON string.
Topology: aggregate the above input and generate a KTable<String, UserAccountBalance>. Key is userId, value is a custom object updated as the aggregation goes.
final KStream<String, String> transactionsInput = streamsBuilder.stream("bank-balance-input-topic"); final KTable<String, UserBankBalance> table = transactionsInput.groupBy((key, value) -> key) .aggregate(() -> new UserBankBalance("dummyUserId", 0, "1866-12-23T17:47:37Z"), (userName, transactionRecordStr, userBankBalance) -> { // Code does the following: // 1. Deserialize the transactionRecordStr // 2. Update the UserBankBalance object. // return userBankBalance; });
(Default key,value serdes configured to be String) However, when running some sanity tests, I got String serializer not compatible with the UserBankBalance object.
Why are Serdes required for operations like mapValues, groupByKey, aggregate?
My understanding:
- Streams library need to materialize the object using the default serdes to update internal state?
- If repartition happens, the key, value needs to be serialized and stored back to the internal partition for further processing?
- Given the above, even if we just create the
KTable<String,UserBankBalance>
as in-memory representation, the Serdes is still required.
I've read the official doc and API doc, just couldn't find any good clarification.
Why Kafka Stream's library doesn't provide a default
ObjectMapperSerdes
which utilize the Jackson'sObjectMapper
(Like this official example)? I imagine lots of users would have similar use cases and there would be duplicate efforts for the library users to do so.
References:
CodePudding user response:
Why are Serdes required for operations
Kafka stores bytes. Streams API doesn't pass objects from one operation to the other, it uses Kafka as a message bus. It needs to serialize any object to bytes to send over the network.
If you are working with JSON, then Kafka Streams already has a built-in way to create a JSONSerde; there doesn't need to be an ObjectMapper
one since you can use Serdes.serdeFrom
static method. (Also, it would create a dependency on connect-json
module and bloat the kafka-streams
classpath).
Alternatively, Spring-Kafka also has JsonSerde
. And Confluent maintains AvroSerde
, ProtobufSerde
, etc. to use with classes generated from those tools.
got String serializer not compatible with the UserBankBalance object
You need to use some combination of Grouped
, Materialized
, Consumed
, or Produced
classes with each operation to override the default topology serde.