I try to get data from Kafka to Flink, I use FlinkKafkaConsumer but Intellij shows me that it is depricated and also ssh console in Google Cloud shows me this error: object connectors is not a member of package org.apache.flink.streaming
. How can I rewrite it to work fine?
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
val topic = "test"
val server = "localhost:9092"
val properties = new Properties()
properties.setProperty("bootstrap.servers", server)
properties.setProperty("group.id", "test")
val inputStream = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
val crimes = inputStream.filter(s => !s.startsWith("ID")).map(_.split(","))
.map(array => CrimeEvent(
date=array(1),
category=IUCRToCategory.getOrElse(array(2), "unrecognized"),
district=array(5),
ifArrest=BooleanToInt(array(3) == "True"),
domestic=BooleanToInt(array(4) == "True"),
ifFBI=IUCRToFBI.getOrElse(array(2), 0)
))
CodePudding user response:
Flink's FlinkKafkaConsumer
has indeed been deprecated and replaced by KafkaSource
. You can find the JavaDocs for the current stable version (Flink 1.15 at this moment) at https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html
CodePudding user response:
FlinkKafkaConsumer is deprecated, you can use KafkaSource. Below is the piece of code in scala
val sourcea = KafkaSource.builder[String]
.setBootstrapServers("localhost:9092")
.setTopics("topic")
.setGroupId("grp")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build