I am reading a file (line by line) in kafka producer and sending each record(i.e per line) across the stream and kafka consumer is receiving this record.Further i need to send it to rest api service from kafka consumer to do some processing and my response would be send to some other topic and so on. I am not sure how can we do it using producer -consumer kafka api? Need help
CodePudding user response:
It is pretty simple. This is how you need to develop a Spring Boot Application with KAFKA Consumer Client that integrates with any backend REST API.
- A Class to instatiate a consumer object
package x.x.x.x.x.
@Service
public class KAFKAConsumer implements Runnable{
public void run(String topicName) {
try {
Consumer<String, String> consumer = new KafkaConsumer<>();
consumer.subscribe(Collections.singleton(topicName));
while (true) {
try {
LOGGER.debug("Polling topic for new events...");
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// Calling REST API for each record
invokeRestAPIClass(Parameters....)
try {
consumer.commitSync();
} catch (CommitFailedException e) {
}
}
}
} catch (SerializationException e1) {
continue;
} catch (Exception e2) {
continue;
}
}
} finally {
consumer.close();
}
} catch (Exception e) {
}
}
- A class to define REST API Template
You can this https://howtodoinjava.com/spring-boot2/resttemplate/spring-restful-client-resttemplate-example/ as a reference to construct a REST API template and include invokeRestAPIClass(Parameters....), to map your request throuhg POJO Classes based on the REST API request and response structure.
- Implementing a Spring Boot Startup main class
package x.x.x.x.x.
@SpringBootApplication(scanBasePackages = "x.x.x.x.x")
@EnableAspectJAutoProxy
public class SpringBootStartup implements CommandLineRunner {
@Inject
private KAFKAConsumer consumer;
@Override
public void run(String... args){
try {
Thread springBootThread = new Thread(consumer);
springBootThread.start();
}catch(Exception ex)
{
}
}
public static void main(String[] args) throws Exception {
SpringApplication.run(SpringBootStartup.class, args);
}
}