Home > Mobile >  What should I use instead depricated FlinkKafkaConsumer? Scala Flink
What should I use instead depricated FlinkKafkaConsumer? Scala Flink

Time:05-12

I try to get data from Kafka to Flink, I use FlinkKafkaConsumer but Intellij shows me that it is depricated and also ssh console in Google Cloud shows me this error: object connectors is not a member of package org.apache.flink.streaming. How can I rewrite it to work fine?

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

val topic = "test"
val server = "localhost:9092"

val properties = new Properties()
properties.setProperty("bootstrap.servers", server)
properties.setProperty("group.id", "test")

val inputStream = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))

val crimes = inputStream.filter(s => !s.startsWith("ID")).map(_.split(","))
      .map(array => CrimeEvent(
        date=array(1),
        category=IUCRToCategory.getOrElse(array(2), "unrecognized"),
        district=array(5),
        ifArrest=BooleanToInt(array(3) == "True"),
        domestic=BooleanToInt(array(4) == "True"),
        ifFBI=IUCRToFBI.getOrElse(array(2), 0)
      ))

CodePudding user response:

Flink's FlinkKafkaConsumer has indeed been deprecated and replaced by KafkaSource. You can find the JavaDocs for the current stable version (Flink 1.15 at this moment) at https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html

CodePudding user response:

FlinkKafkaConsumer is deprecated, you can use KafkaSource. Below is the piece of code in scala

    val sourcea = KafkaSource.builder[String]
      .setBootstrapServers("localhost:9092")
      .setTopics("topic")
      .setGroupId("grp")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build
  • Related