Query finished successfully, data appeared in the sink. But the listener got empty Metrics:
spark.listenerManager.register(new QueryExecutionListener {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
println(qe.observedMetrics)
println(qe.executedPlan.metrics)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
//pass
}
})
result:
Map()
Map()
Why? How to tell spark to collect these metrics?
edit:
Spark 3.1.2 on K8s (just used the example from the spark documentation)
val spark = SparkSession
.builder()
.appName("test-app")
.getOrCreate()
// at this point comes the 'spark.listenerManager.register' part
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("subscribe", "input")
.option("maxOffsetsPerTrigger", 1000)
.load()
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host:port")
.option("topic", "output")
.start()
ds.awaitTermination()
CodePudding user response:
QueryExecutionListener
is for batch job, for stream job, it should be:
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " queryProgress.progress)
}
})