I'm writing a small Scala program that should get some small dataframe, write it to Scala and terminate.
I do it with Spark Structured Streaming. The data is written to Kafka (console consumer shows that everything is okay), but StreamingQuery
, that successfully writes to Kafka, is freezing on awaitTermination()
method.
How can I cope with it?
Here's my Scala core to reproduce the problem:
package part4integrations
import org.apache.spark.sql.SparkSession
import common._
object IntegratingKafkaDemo {
val spark = SparkSession.builder()
.appName("Integrating Kafka")
.master("local[2]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
def writeToKafka() = {
val carsDF = spark.readStream
.schema(carsSchema)
.json("src/main/resources/data/cars")
val carsKafkaDF = carsDF.selectExpr("upper(Name) as key", "Name as value")
// kafka: writing but NOT exiting, WTF?!
carsKafkaDF.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rockthejvm")
.option("checkpointLocation", "checkpoints_demo")
.start().awaitTermination()
}
def main(args: Array[String]): Unit = {
writeToKafka()
}
}
And here's the json cars/cars.json
{"Name":"chevrolet chevelle malibu", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":307, "Horsepower":130, "Weight_in_lbs":3504, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"buick skylark 320", "Miles_per_Gallon":15, "Cylinders":8, "Displacement":350, "Horsepower":165, "Weight_in_lbs":3693, "Acceleration":11.5, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"plymouth satellite", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":318, "Horsepower":150, "Weight_in_lbs":3436, "Acceleration":11, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"amc rebel sst", "Miles_per_Gallon":16, "Cylinders":8, "Displacement":304, "Horsepower":150, "Weight_in_lbs":3433, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"ford torino", "Miles_per_Gallon":17, "Cylinders":8, "Displacement":302, "Horsepower":140, "Weight_in_lbs":3449, "Acceleration":10.5, "Year":"1970-01-01", "Origin":"USA"}
CodePudding user response:
Stream is by definition infinite, that's why it didn't finish. You have two possible solutions:
use
spark.read
and thendf.write.format("kafka")
as mentioned by @OneCricketeer, but it could be more complex if you need to handle only new files, as you will need to track somewhere which files were already processed, and which notuse explicit trigger with
Trigger.Once
orTrigger.AvailableNow
(in Spark 3.3) to process only new data since previous run, and finish, instead of waiting indifinitely. See documentation for more examples. In your case it could be something like this:
carsKafkaDF.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rockthejvm")
.option("checkpointLocation", "checkpoints_demo")
.trigger(Trigger.Once())
.start().awaitTermination()
CodePudding user response:
It's not "freezing", it is waiting for a new batch files in that directory.
If you want to start the job, and have it end, you want to batch with spark.read.json
and spark.write.format("kafka")
rather than use the read/write Stream
methods