Home > Back-end >  Suppress KTable aggregation to intermediary topic
Suppress KTable aggregation to intermediary topic

Time:09-16

I am trying to suppress a KTable after an aggregation. It would be ideal to store them in a different topic when the time window is large (12, 24h etc) and tons of data can stack up.

KTable<Windowed<String>, Object> is the aggregation result's type.

Trying immediately after the aggregation calling suppress will cause an exception:

.suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
                            .withLoggingEnabled(Map.of())) 
                            .withName("IntermediaryAggregationsStore"))

The exception:

Caused by: java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String (org.apache.kafka.streams.kstream.Windowed is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

In the initial configuration the String serde is used and It must stay that way, until the suppressing moment.

Is there any way of handling this?

            split.branch((key, value) -> true, branchConsumer(s -> s
            //.transform(TimestampTransformer::new)
            .map((key, someObject) -> new KeyValue<>(someObject.getId(), someObject))
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofMinutes(1)))
            .aggregate(NewObject::new,
                    (key, value, aggregate) -> {
                        // do the mapping between SomeObject and NewObject
                        );
                        return aggregate;
                    }
            )
            .suppress(Suppressed.<Windowed<String>>untilWindowCloses(Suppressed.BufferConfig.unbounded()
                            .withLoggingEnabled(Map.of()))
                    .withName("someName"))
        // continue with what's left to do

CodePudding user response:

aggregate function accepts a Materialized class that you can override what serializer is passed forward to the suppression.

  • Related