Home > Back-end >  How can we call a remote service(i.e rest api based) for each kafka message received at kafka consum
How can we call a remote service(i.e rest api based) for each kafka message received at kafka consum

Time:06-24

I am reading a file (line by line) in kafka producer and sending each record(i.e per line) across the stream and kafka consumer is receiving this record.Further i need to send it to rest api service from kafka consumer to do some processing and my response would be send to some other topic and so on. I am not sure how can we do it using producer -consumer kafka api? Need help

CodePudding user response:

It is pretty simple. This is how you need to develop a Spring Boot Application with KAFKA Consumer Client that integrates with any backend REST API.

  1. A Class to instatiate a consumer object

package x.x.x.x.x.

@Service

public class KAFKAConsumer implements Runnable{

public void run(String topicName) {
    try {
        Consumer<String, String> consumer = new KafkaConsumer<>();
        consumer.subscribe(Collections.singleton(topicName));
            while (true) {
                try {
                    LOGGER.debug("Polling topic for new events...");
                    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
                    for (TopicPartition partition : consumerRecords.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {

                            // Calling REST API for each record

                            invokeRestAPIClass(Parameters....)

                            try {
                                consumer.commitSync();
                            } catch (CommitFailedException e) {
                            }
                        }
                    }
                } catch (SerializationException e1) {
                    continue;
                } catch (Exception e2) {
                    continue;
                }
            }

        } finally {
            consumer.close();
        }
    } catch (Exception e) {
    }
}
  1. A class to define REST API Template

You can this https://howtodoinjava.com/spring-boot2/resttemplate/spring-restful-client-resttemplate-example/ as a reference to construct a REST API template and include invokeRestAPIClass(Parameters....), to map your request throuhg POJO Classes based on the REST API request and response structure.

  1. Implementing a Spring Boot Startup main class

package x.x.x.x.x.

@SpringBootApplication(scanBasePackages = "x.x.x.x.x")

@EnableAspectJAutoProxy

public class SpringBootStartup implements CommandLineRunner {

@Inject
private KAFKAConsumer consumer;

@Override
public void run(String... args){
    try {
        Thread springBootThread = new Thread(consumer);
        springBootThread.start();
    }catch(Exception ex)
    {
    }
}

public static void main(String[] args) throws Exception {
    SpringApplication.run(SpringBootStartup.class, args);
}

}

  • Related