Home > Software design >  How to use Flink's KafkaSource with Scala in 2022
How to use Flink's KafkaSource with Scala in 2022

Time:10-06

I've checked out this similar but 7 year old question but it does not apply to newer Flink versions.

I'm trying to get a simple Flink Kafka job running and have tried various versions getting different compile errors for each. I'm using sbt to manage my dependencies:

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-clients" % flinkVersion,
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)

Versions tried:
scala 2.11.12 and 2.12.15
flink 1.14.6

The code I'm trying to compile (relevant bits):

import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource

...

  val env = ExecutionEnvironment.getExecutionEnvironment
  val kafkaConsumer = new KafkaSource.builder[String]
    .setBootstrapservers("localhost:9092")
    .setGroupId("flink")
    .setTopics("test")
    .build()

  val text = env.fromSource(kafkaConsumer)

I did not find an official example that this is indeed how one is supposed to use the KafkaSource but I found this setup here and here. To my still very new Java eyes this looks aligned with the API docs. But yeah can't get it to work with either Scala version:

[error] somepathwithmyfile: type builder is not a member of object org.apache.flink.connector.kafka.source.KafkaSource
[error]     val kafkaConsumer = new KafkaSource.builder[String]
[error]                                         ^
[error] somepathwithmyfile: value fromSource is not a member of org.apache.flink.api.scala.ExecutionEnvironment
[error]     val text = env.fromSource(kafkaConsumer)
[error]                    ^
[error] two errors found

CodePudding user response:

For the first problem, drop the new:

 val kafkaConsumer = KafkaSource.builder[String]
   ...

For the second problem, fromSource requires three arguments:

  /** Create a DataStream using a [[Source]]. */
  @Experimental
  def fromSource[T: TypeInformation](
      source: Source[T, _ <: SourceSplit, _],
      watermarkStrategy: WatermarkStrategy[T],
      sourceName: String): DataStream[T] = {

    val typeInfo = implicitly[TypeInformation[T]]
    asScalaStream(javaEnv.fromSource(source, watermarkStrategy, sourceName, typeInfo))
  }

Also, note that Flink does not (yet) support scala 2.12.15. See https://issues.apache.org/jira/browse/FLINK-20969. However, Flink 1.15 can be used with newer versions of Scala (including Scala 3), if you exclude Flink's built-in scala API support. See https://flink.apache.org/2022/02/22/scala-free.html for more on this.

  • Related