Home > Net >  Multiple Spark streaming readstream & writestream
Multiple Spark streaming readstream & writestream

Time:09-13

What's the best way to read, transform and write streams with different structures? I have an application with two readstreams and writestreams from two different kafka topics. The two streams have different structures and follow different transformation processes. When writing the individual dataframe to console, I see only one stream in the console output, instead of two:

  val df1 = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaBroker)
    .option("subscribe", "topic-event-1")
    .load()

  val df2 = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaBroker)
    .option("subscribe", "topic-event-2")
    .load()

  eventOneStream(df1)
  eventTwoStream(df2)


  def eventOneStream(dataframe: DataFrame): Unit = {
    // do some transformations
    //then write streams to console
    dataframe
      .writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination()
  }

  def eventTwoStream(dataframe: DataFrame): Unit = {
    // do some other type of transformations
    //then write streams to console
    dataframe
      .writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination()
  }

CodePudding user response:

You see only one stream because the first query is blocking the second query when you did awaitTermination(). What you can do is start both and then use StreamingQueryManager.awaitAnyTermination()

  val query1 = eventOneStream(df1)
  val query2 = eventTwoStream(df2)

  spark.streams.awaitAnyTermination()


  def eventOneStream(dataframe: DataFrame): StreamingQuery = {
    // do some transformations
    //then write streams to console
    dataframe
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
  }

  def eventTwoStream(dataframe: DataFrame): StreamingQuery = {
    // do some other type of transformations
    //then write streams to console
    dataframe
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
  }
  • Related