Home > Software design >  How can one use SparkListener callback functions?
How can one use SparkListener callback functions?

Time:01-28

How can one use SparkListener callback functions? Can we only use it for logging info? Is there any way to accumulate sparkListener.onTaskEnd information for analysis in Spark application code?

For example..

sparkSession.sparkContext.addListener(new SomeListener)

class SomeListener {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd){

  // is there a proper/correct/valid way to accumulate this and use
  // it later in application  code?

  taskEnd.taskInfo

  }
}

Can we get info gathered in callback functions?

CodePudding user response:

This is in context of Spark Structure Streaming. It provides event lifecycle related information which would be helpful for logging, troubleshooting etc. Below is a small implementation.

sparkSession.streams.addListener(listenStreamingQuery)

  val listenStreamingQuery: StreamingQueryListener = new StreamingQueryListener() {

    def onQueryStarted(event: QueryStartedEvent): Unit = {
      println("Query started: "   event.id   " at: "   System.currentTimeMillis)
    }

    def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
      println("Query terminated: "   event.id   " at: "   System.currentTimeMillis)

    }

    def onQueryProgress(event: QueryProgressEvent): Unit = {
      val queryProgress = event.progress
      println("Query progress: "   queryProgress)
      if (queryProgress.numInputRows > 0) {
        // Implement Logging. Query the in memory table 
       val currentDf = spark.sql("select * from schema_counts order by count desc")
        
        // group them together into a single spark partition and overwrite them into a hive table.
        currentDf.repartition(1).write.mode("overwrite").saveAsTable("qa.eventlogging_valid_mixed_schema_counts")
       
        currentDf.show()
      }
      

    }
  }


  • Related