Home > database >  Unable to write multiple queries in foreachbatch
Unable to write multiple queries in foreachbatch

Time:10-19


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter

object SparkStreamingKafka1 {
 def main(args:Array[String]):Unit={
     System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
     val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
       spark.sparkContext.setLogLevel("OFF")
     import spark.implicits._
     val df = spark.readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", "localhost:9092")
       .option("subscribe", "demo2")
       .option("startingOffsets", "earliest") // From starting
       .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
     .add("stock_name",StringType)
     .add("stock_price",IntegerType)
     .add("date",StringType)
     
val personDF = personStringDF.select(from_json(col("value"), schema).as("data"))
  .select("data.*")
   personDF.createOrReplaceTempView("persontab")
  /*  spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream
     .format("console")
     .outputMode("complete")
     .start()
     .awaitTermination()*/
   
spark.sql("""select min(stock_price) as min_stock_price,
max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from persontab""")
.writeStream.outputMode("complete").foreachBatch{(batchDF:DataFrame,batchId:Long) =>
println("inside the foreachbatch1")
batchDF.show()
batchDF.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","max_min_avg")
.option("user","root")
.option("password","root")
.option("header","true")
.save()

println("saved")
     
   }
 // .outputMode("complete")
 .start()
 .awaitTermination()

 }

}

I have three queries to be performed. One is with aggregations which I have done here in the above code and is working fine. The other two is where clause queries. How to accomplish those two queries here. Is it able to save the results of all the three queries in one table or do I need to save in different tables? Please let me know how to do it in both the ways.

CodePudding user response:

package mysql.kafka.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import java.time.{ZonedDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.{min, max, avg}

object multiplequerieskafkastreaming {
  
 def main(args:Array[String]):Unit={
      System.setProperty("hadoop.home.dir", "C:\\hadoop\\")
      val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("OFF")
      import spark.implicits._
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "demo3")
        .option("startingOffsets", "earliest") // From starting
        .load()


val personStringDF = df.selectExpr("CAST(value AS STRING)")
val schema=new StructType()
      .add("stock_name",StringType)
      .add("stock_price",IntegerType)
      .add("date",StringType)
      
 val stockDF = personStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")
    
  //getting yesterday's date
   val yesterday = ZonedDateTime.now(ZoneId.of("UTC")).minusDays(1)
val formatter = DateTimeFormatter.ofPattern("dd-MM-yyyy")
val yesterdaydate = formatter format yesterday
//println(yesterdaydate)

stockDF
.writeStream.foreachBatch{(batchDF:DataFrame,batchId:Long) =>
    println("inside the foreachbatch1")
    batchDF.persist()
    batchDF.createOrReplaceTempView("stocktab")
    println(yesterdaydate)
  
    val Current_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price from stocktab where stock_name='abc'")
    println("select stock_name,stock_price from stocktab where stock_name='abc'")
    Current_stock_pricedf.show()
    val min_max_df= batchDF.sparkSession.sql("""select min(stock_price) as min_stock_price,
    max(stock_price) as max_stock_price,avg(stock_price) as avg_stock_price from stocktab""")
    min_max_df.show()
  
    val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date='" yesterdaydate "'")
  //  val Yesterday_stock_pricedf=batchDF.sparkSession.sql("select stock_name,stock_price,date from stocktab where date=cast({} as date)".format(yesterdaydate))
   
    Yesterday_stock_pricedf.show()
  
    min_max_df.write.format("jdbc")
    .mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
    //.option("driver","com.mysql.jdbc.driver")
    .option("dbtable","max_min_avg")
    .option("user","root")
    .option("password","root")
    .option("header","true")
    .save()
    println("saved min_max_df")

Current_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","current_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Current_stock_pricedf")

Yesterday_stock_pricedf.write.format("jdbc")
.mode("append").option("url","jdbc:mysql://localhost:3306/gkstorespipelinedb")
//.option("driver","com.mysql.jdbc.driver")
.option("dbtable","Yesterday_stock_price")
.option("user","root")
.option("password","root")
.option("header","true")
.save()
batchDF.unpersist()
println("saved Yesterday_stock_pricedf")
      
    }
  // .outputMode("complete")
  .start()
  .awaitTermination()
 
  }
 
}

The code is working fine . created a temp table and taken multiple queries inside the foreachbatch with the help of df.sparkSession.sql(""). For reference: How to work with temporary tables in foreachBatch?. Need to put some exception handling in the code, to avoid errors. other than that the main code is working fine.

Thank you @OneCricketeer for your suggestions.

  • Related