Home > Software design >  Spring Kafka - Added Store cannot access from stream process
Spring Kafka - Added Store cannot access from stream process

Time:02-25

I'm facing an issue with Spring Kafka which is that it cannot access state store from process event I added that particular store into topology/streams.

method 1:

@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {

    private static final Logger logger =
            LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);

    @Autowired
    private StreamConfiguration configuration;
    @Autowired
    private TimeLineChangesCaptureService timeLineChangesCaptureService;

    @Autowired
    public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
        final Serde<String> stringSerde = Serdes.String();
        final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
        timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);

        final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
        paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);

        KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
                Stores.persistentKeyValueStore("demo-store-2"),
                stringSerde,
                paymentChangedSpecificAvroSerde,
                new SystemTime());



        KStream<String, TimelineVersionUpdated> stream = builder.stream(
                Topics.MOS_BUDGET_TIMELINE_VERSION,
                Consumed.with(
                        stringSerde,
                        timelineVersionUpdatedSpecificAvroSerde
                ));

        StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);

        stream.process(new ProcessorSupplier<>() {
            @Override
            public Processor<String, TimelineVersionUpdated> get() {
                return new Processor<>() {

                    private ProcessorContext context;
                    private KeyValueStore<String, PaymentChanged> store;

                    @Override
                    public void init(ProcessorContext processorContext) {
                        this.context = processorContext;
                        this.store = context.getStateStore("demo-store-2");
                    }

                    @Override
                    public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
                        logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
                        if (timelineVersionUpdated == null) {
                            return;
                        }
                        timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
                    }

                    @Override
                    public void close() {
                    }
                };
            }
        });


        Topology topology = builder.build();
        logger.info("{}", topology.describe().toString());
    }

when I ran the above code I'm getting the below exception:

org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
    at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream$1$1.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
    at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init$0(ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
    ... 7 common frames omitted

Then I'm trying to add a store like below: Method 2:

    @Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() {
        final Serde<String> stringSerde = Serdes.String();
        final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
        paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);

        return factoryBean -> {
            try {
                final StreamsBuilder streamsBuilder = factoryBean.getObject();
                streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
                        Stores.persistentKeyValueStore("store-demo-3"),
                        stringSerde,
                        paymentChangedSpecificAvroSerde
                ));
            } catch (Exception e) {
                logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
            }
        };
    }

Then trying to access that store from the process but end up getting the same exception.

Kindly help to understand the issue.

CodePudding user response:

Adding a state store to a Topology id just the first step but it does not make it available: in order to allow a Processor to use a state store, you must connect both.

The simplest way is to pass in the state store name when adding the Processor:

stream.process(..., "storeName");
  • Related