Home > Back-end >  Read the data from @KafkaListener when user hits GET request using Spring boot and spring Kafka
Read the data from @KafkaListener when user hits GET request using Spring boot and spring Kafka

Time:04-20

@Service
public class KafkaConsumer {
    
    @KafkaListener(id = "Id", topics = "topic1")
    public void listen(String msg) {
        System.out.println("Key - "   msg);
    }
}

@RestController
public class GetKafkaData {

    @RequestMapping(method = RequestMethod.GET, value = "/api/get")
    public String getKafkaListnerData() {
        // Some code that need to write to get data from kafka Listner
    }
}

Explanation of the above classes.

  1. KafkaConsumer - Listen to the data from the Kafka
  2. GetKafkaData - Controller which gets the listened data from Kafka.

I'm not adding my Producer and Other Controller which is basically publishing data to Kafka.

So my question is once I hit the localhost:8080/api/get API it should call the listen() method from the KafkaConsumer Class and returns the response as the consumed data from the kafka.

In the above case listen() method gets called once you published data into Kafka from the producer.

I'm using the following libraries for development.

1. Spring boot

2. Spring Kafka

CodePudding user response:

What you describe that you need is not possible from logical perspective.

@KafkaListener(id = "Id", topics = "topic1")
    public void listen(String msg) {
        System.out.println("Key - "   msg);
    }

The point of this method is to be called, when kafka triggers the listener because of some published data in topic1. Actually the kafka listener does an infinite loop around poll, so it is constantly looking for published data, and if there are (found with poll), then it tries to read them.

So a background thread runs which constantly does the poll and invokes your method listen when needed.

As you understand this is an asynchronous process which can't be served per client request on some other method, since the originating logical component is the scheduler which does the background job and then calls your method listen.

So to your logical requirements, the @KafkaListener is not providing any help.

You could try to create your own KafkaConsumer and manually poll the kafka cluster when the client makes a get request to your method getKafkaListnerData. Check here a very simple example of how you can manually create the consumer.

Here is also the documetation for KafkaConsumer where you can see what options are available for you to use for your manual handling. Probably you will need some specific subscription and then you should use a single poll to extract the data that you want. Don't forget to close the consumer when you don't need it any more since otherwise it would lead to leak of resources.

CodePudding user response:

You can use the Application context interface and use the BindingService to bind/unbind topics from kafka. This methods works with Kafka-streams

@Component
public class ConsumerService {

    @Autowired
    private ApplicationContext applicationContext;

    public void start(String channelName) {

        final BindingService bindingService = this.applicationContext.getBean(BindingService.class);

        final Map<String, SubscribableChannel> subscribableChannelMap = this.applicationContext
                .getBeansOfType(SubscribableChannel.class);

        bindingService.unbindConsumers(channelName);
        bindingService.bindConsumer(subscribableChannelMap.get(channelName), channelName);

    }

    public void stop(String channelName)  {

        final BindingService bindingService = this.applicationContext.getBean(BindingService.class);

        bindingService.unbindConsumers(channelName);

    }
}

  • Related