Home > database >  Spark metrics are empty
Spark metrics are empty

Time:02-16

Query finished successfully, data appeared in the sink. But the listener got empty Metrics:

  spark.listenerManager.register(new QueryExecutionListener {
      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
        println(qe.observedMetrics)
        println(qe.executedPlan.metrics)
      }

      override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
      //pass
      }
  })

result:

Map()
Map()

Why? How to tell spark to collect these metrics?

edit:

Spark 3.1.2 on K8s (just used the example from the spark documentation)

 val spark = SparkSession
   .builder()
   .appName("test-app")
   .getOrCreate()

// at this point comes the 'spark.listenerManager.register' part

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "input")
  .option("maxOffsetsPerTrigger", 1000)
  .load()

val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("topic", "output")
  .start()

ds.awaitTermination()

CodePudding user response:

QueryExecutionListener is for batch job, for stream job, it should be:

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: "   queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: "   queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: "   queryProgress.progress)
    }
})

You can refer https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis

  • Related