Home > Net >  Spark Stuctured Streaming: StreamingQuery.awaitTermination() does not exit after writing all data to
Spark Stuctured Streaming: StreamingQuery.awaitTermination() does not exit after writing all data to

Time:05-10

I'm writing a small Scala program that should get some small dataframe, write it to Scala and terminate.

I do it with Spark Structured Streaming. The data is written to Kafka (console consumer shows that everything is okay), but StreamingQuery, that successfully writes to Kafka, is freezing on awaitTermination() method.

How can I cope with it?

Here's my Scala core to reproduce the problem:

package part4integrations

import org.apache.spark.sql.SparkSession
import common._

object IntegratingKafkaDemo {

  val spark = SparkSession.builder()
    .appName("Integrating Kafka")
    .master("local[2]")
    .getOrCreate()

  spark.sparkContext.setLogLevel("ERROR")

  def writeToKafka() = {
    val carsDF = spark.readStream
      .schema(carsSchema)
      .json("src/main/resources/data/cars")

    val carsKafkaDF = carsDF.selectExpr("upper(Name) as key", "Name as value")

    // kafka: writing but NOT exiting, WTF?!
    carsKafkaDF.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "rockthejvm")
      .option("checkpointLocation", "checkpoints_demo")
      .start().awaitTermination()

  }

  def main(args: Array[String]): Unit = {
        writeToKafka()
  }
}

And here's the json cars/cars.json

{"Name":"chevrolet chevelle malibu", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":307, "Horsepower":130, "Weight_in_lbs":3504, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"buick skylark 320", "Miles_per_Gallon":15, "Cylinders":8, "Displacement":350, "Horsepower":165, "Weight_in_lbs":3693, "Acceleration":11.5, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"plymouth satellite", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":318, "Horsepower":150, "Weight_in_lbs":3436, "Acceleration":11, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"amc rebel sst", "Miles_per_Gallon":16, "Cylinders":8, "Displacement":304, "Horsepower":150, "Weight_in_lbs":3433, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"ford torino", "Miles_per_Gallon":17, "Cylinders":8, "Displacement":302, "Horsepower":140, "Weight_in_lbs":3449, "Acceleration":10.5, "Year":"1970-01-01", "Origin":"USA"}

CodePudding user response:

Stream is by definition infinite, that's why it didn't finish. You have two possible solutions:

  • use spark.read and then df.write.format("kafka") as mentioned by @OneCricketeer, but it could be more complex if you need to handle only new files, as you will need to track somewhere which files were already processed, and which not

  • use explicit trigger with Trigger.Once or Trigger.AvailableNow (in Spark 3.3) to process only new data since previous run, and finish, instead of waiting indifinitely. See documentation for more examples. In your case it could be something like this:

carsKafkaDF.writeStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "rockthejvm")
      .option("checkpointLocation", "checkpoints_demo")
      .trigger(Trigger.Once())
      .start().awaitTermination()

CodePudding user response:

It's not "freezing", it is waiting for a new batch files in that directory.

If you want to start the job, and have it end, you want to batch with spark.read.json and spark.write.format("kafka") rather than use the read/write Stream methods

  • Related