Home > database >  Kafka consumer didn't consume
Kafka consumer didn't consume

Time:10-12

I'm trying to create as app that will consume a Kafka topic with 10 partitions. I didn't define the number of partitions of the consumer at application.properties because the app will be deployed in OpenShift, and I want that the OpenShift do the load balance. The problem is that when I'm start the application it didn't consume anything. The code is very simple, and I really don't know whats wrong:

Here is the consumer, the log shows nothing:

    @Incoming("plac-fate")
    @Transactional
    public void consume(NFeDistSVBAPayload payload) throws PlacException {
        logger.info("Consumindo payload: "   payload);
        service.criaNFeDistSVBAEntity(payload);
        logger.info("Payload com nrProtocolo '"   payload.getNrProtocolo()   "' consumido com sucesso.");
    }

I also have as deserializer its log didn't show anything:

@Override
    public NFeDistSVBAPayload deserialize(String topic, byte[] data) {
        logger.info("Deserializando mensagem do topico: "   topic);
        var strPayload = new String(data, StandardCharsets.UTF_8);
        var module = new JaxbAnnotationModule();
        var mapper = JsonMapper.builder()
        .enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER)
        .build();
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.registerModule(module);
        try {
            var payload =  mapper.readValue(strPayload, NFeDistSVBAPayload.class);
            return payload;
        } catch (JsonProcessingException e) {
            logger.error(e);
            return null;
        }
    }

and here is the àpplication.properties:

quarkus.kafka.devservices.enabled=false
kafka.bootstrap.servers=${PLAC_KAFKA_URL}
mp.messaging.incoming.plac-fate.connector=smallrye-kafka
mp.messaging.incoming.plac-fate.value.deserializer=br.gov.pr.fazenda.plac.dominio.utils.DistNFePayloadSVBADeserializer
mp.messaging.incoming.plac-fate.group.id=plac-fate-consumer

and here is the app log:

__  ____  __  _____   ___  __ ____  ______ 
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2022-10-11 18:53:42,919 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18229: Configured topics for channel 'plac-fate': [plac-fate]
2022-10-11 18:53:42,949 INFO  [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18214: Key deserializer omitted, using String as default
2022-10-11 18:53:43,764 INFO  [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-plac-fate, connected to Kafka brokers 'kafka:9092', belongs to the 'plac-fate-consumer' consumer group and is configured to poll records from [plac-fate]
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) consumer 1.0-SNAPSHOT on JVM (powered by Quarkus 2.12.3.Final) started in 12.193s. Listening on: http://localhost:8080
2022-10-11 18:53:43,873 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-10-11 18:53:43,874 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, hibernate-orm-panache, jdbc-oracle, kafka-client, micrometer, narayana-jta, qute, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

As you can see the log says that the consumer was created and connected but it does nothing.

CodePudding user response:

Consumers default to start at the very end of the topic (where there is nothing to consume)

If you want to start at the beginning of the topic and read existing events, you'll need to add

mp.messaging.incoming.{channel-name}.auto.offset.reset=earliest
  • Related