Home > other >  What the increasing behavior of spark_driver_dagscheduler_messageprocessingtime_type_timers_count me
What the increasing behavior of spark_driver_dagscheduler_messageprocessingtime_type_timers_count me

Time:10-25

We have a Spark Job (see the code):

package io.companyname.industry.streaming

import java.nio.file.{Files, Paths, StandardCopyOption}
import java.util.TimeZone

import io.companyname.core.utils.AppConfig._
import io.companyname.core.utils.CloudFiles
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types._
import org.apache.spark.sql.{ColumnName, DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel

object KafkaDataGeneratorJob extends CloudFiles {
  private val hdfsHost = System.getenv("HDFS_HOST")
  private val hdfsPort = System.getenv("HDFS_PORT")

  val hdfsLocation: String = s"$hdfsHost:$hdfsPort"//"hdfs-hadoop-hdfs-nn:9000" //"hdfs-hadoop-hdfs-nn:9000"

  val digitalTwinId = "NP20100000"
  val homeZeppelinPrefix = "/home/zeppelin/"

  def readFromCSVFile(path: String, shortPath: String)(implicit spark: SparkSession): DataFrame = {

    implicit val hadoopConf = spark.sparkContext.hadoopConfiguration
    copy("file:///" path, "hdfs://" hdfsLocation homeZeppelinPrefix "/generator/" shortPath)

    println(path)
    val schema = StructType(
      StructField("id", LongType, nullable = false) ::
        StructField("Energy Data", StringType, nullable = false) ::
        StructField("Distance", StringType, nullable = false) ::
        StructField("Humidity", StringType, nullable = false) ::
        StructField("Ambient Temperature", StringType, nullable = false) ::
        StructField("Cold Water Temperature", StringType, nullable = false) ::
        StructField("Vibration Value 1", StringType, nullable = false) ::
        StructField("Vibration Value 2", StringType, nullable = false) ::
        StructField("Handle Movement", StringType, nullable = false) ::
        Nil)

    spark.read
      .format("csv")
      //.option("inferSchema", "true") //is it working?
      //.option("header", "true") //is it working?
      .schema(schema)
      .option("delimiter", ";")
      .option("maxFilesPerTrigger", 1)
      .csv("hdfs://" hdfsLocation homeZeppelinPrefix "/generator/" shortPath)
        .persist(StorageLevel.MEMORY_ONLY_SER)
  }

  def generateStream(divider: Int)(implicit spark: SparkSession): DataFrame = {
    val rate = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .option("numPartitions", 10)
      .load()
    //.withWatermark("timestamp", "1 seconds")

    //    println("Rate schema:")
    //    println(rate.schema.fields.mkString)
    //
    //    println("Rate plan:")
    //    println(rate.queryExecution.toString)

    //Thread.sleep(1000000)
    println("Preparing to load the csv file from resource kikai.csv")
    val input = getClass.getResourceAsStream("kikai.csv")
    println("Loaded file from resource kikai.csv")
    println("Input stream from resource kikai.csv details: " input.toString)
    println("Creating temp file")
    val tmpPath = Files.createTempDirectory("csv")
    println("Creating temp directory path details: " tmpPath.getFileName)
    val pathCopiedFIle = Paths.get(tmpPath.toString, "kikai.csv")
    println("Creating temp file path details: " pathCopiedFIle.getFileName)
    Files.copy(input, pathCopiedFIle, StandardCopyOption.REPLACE_EXISTING)
    println("Creating temp file succeed")

    val cvsStream = readFromCSVFile(tmpPath.toString, "" pathCopiedFIle.getFileName)
    //println("CSV schema:")
    //println(cvsStream.schema.fields.mkString)
    //
    //    println("Before join:")
    //    println(cvsStream.queryExecution.toString)

    //val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == mod(counter.value,10)").withWatermark("timestamp", "1 seconds")
    val cvsStream2 = rate.as("counter").join(cvsStream.as("csv")).where("csv.id == mod(counter.value,"   divider   ")").withWatermark("timestamp", "1 seconds")
    println("CSV schema:")
    println(cvsStream2.schema.fields.mkString)
    cvsStream2
    //    println("After join:")
    //    println(cvsStream2.queryExecution.toString)

  }

  def streamMqttMessageForSpecificColumn(columnName: String, dataFrame: DataFrame, topicName: String): StreamingQuery = {
    implicit val spark: SparkSession = SparkSession
      .builder
      //.master("local[1]")
      .config(
        new SparkConf().setIfMissing("spark.master", "local[*]")
          .set("spark.default.parallelism", "1")
          .set("spark.sql.shuffle.partitions", "1")
          .set("spark.sql.streaming.metricsEnabled", "true")
          .set("spark.sql.session.timeZone", "UTC")
          .set("spark.eventLog.dir", "file:///tmp/spark-events")
      ).getOrCreate()

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
    Logger.getRootLogger.setLevel(Level.WARN)

    dataFrame.select(
      lit(topicName) as "key",
      //Energy Data = 2021-10-13T11:27:32.222Z : NP20100000 : 0.00 246.47 0.00
      concat(date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), lit(" : "), lit(digitalTwinId), lit(" : "), lit(new ColumnName(columnName))) as "value")
      .writeStream
      //.trigger(Trigger.ProcessingTime(10))
      .format("kafka")
      //.format("console")
      //.option("truncate", "false")
      .option("checkpointLocation", s"hdfs://$hdfsLocation/checkpoint/kafkaDataGenerator$columnName")
      .option("kafka.bootstrap.servers", kafkaLocation)
      .option("topic", "mqtt")
      .queryName("kafkaDataGenerator"   columnName)
      .start()
  }

  def generateStream()(implicit spark: SparkSession) {
    @transient lazy val LOG = Logger.getLogger(getClass.getName)

    implicit val spark: SparkSession = SparkSession
      .builder
      .master("local[1]")
      //.config(new SparkConf().setIfMissing("spark.master", "local[1]"))
      .getOrCreate()

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
    Logger.getRootLogger.setLevel(Level.WARN)

    val df = generateStream(10)

    //    println("CSV schema:")
    //    println(df.schema.fields.mkString)

    streamMqttMessageForSpecificColumn("Energy Data", df, "ESP_02/Energy Data")
    streamMqttMessageForSpecificColumn("Distance", df, "ESP_02/Distance")
    streamMqttMessageForSpecificColumn("Humidity", df, "ESP_02/Humidity")
    streamMqttMessageForSpecificColumn("Ambient Temperature", df,"ESP_02/Ambient Temperature")
    streamMqttMessageForSpecificColumn("Cold Water Temperature", df,"ESP_02/Cold Water Temperature")
    streamMqttMessageForSpecificColumn("Vibration Value 1", df,"ESP_02/Vibration Value 1")
    streamMqttMessageForSpecificColumn("Vibration Value 2", df,"ESP_02/Vibration Value 2")
    streamMqttMessageForSpecificColumn("Handle Movement", df,"ESP_01/Handle Movement")

    //    println("After start:")
    //    println(df.queryExecution.toString)
    //
    //spark.streams.awaitAnyTermination(3000000)

  }

  def main(args: Array[String]): Unit = {
    @transient lazy val LOG = Logger.getLogger(getClass.getName)

    implicit val spark: SparkSession = SparkSession
      .builder
      .master("local[1]")
      //.config(new SparkConf().setIfMissing("spark.master", "local[1]"))
      .getOrCreate()

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
    Logger.getRootLogger.setLevel(Level.WARN)

    val df = generateStream(10)

    //    println("CSV schema:")
    //    println(df.schema.fields.mkString)

    streamMqttMessageForSpecificColumn("Energy Data", df, "ESP_02/Energy Data")
    streamMqttMessageForSpecificColumn("Distance", df, "ESP_02/Distance")
    streamMqttMessageForSpecificColumn("Humidity", df, "ESP_02/Humidity")
    streamMqttMessageForSpecificColumn("Ambient Temperature", df,"ESP_02/Ambient Temperature")
    streamMqttMessageForSpecificColumn("Cold Water Temperature", df,"ESP_02/Cold Water Temperature")
    streamMqttMessageForSpecificColumn("Vibration Value 1", df,"ESP_02/Vibration Value 1")
    streamMqttMessageForSpecificColumn("Vibration Value 2", df,"ESP_02/Vibration Value 2")
    streamMqttMessageForSpecificColumn("Handle Movement", df,"ESP_01/Handle Movement")

    //    println("After start:")
    //    println(df.queryExecution.toString)
    //
    spark.streams.awaitAnyTermination()
  }

}

During the execution the Job fails each 13 seconds with OOM

enter image description here

As for specific Spark metrics only looks increasing dramatically:

enter image description here

Is there any key for understanding what kind of leak I have here?

CodePudding user response:

From the PR that introduced this metric:

This commit adds a new metric, messageProcessingTime, to the DAGScheduler metrics source. This metrics tracks the time taken to process messages in the scheduler's event processing loop, which is a helpful debugging aid for diagnosing performance issues in the scheduler (such as SPARK-4961).

  • Related