Home > Back-end >  Spark Streaming subscribe multiple topics and write into multiple topics
Spark Streaming subscribe multiple topics and write into multiple topics

Time:07-20

I have some kafkas topics with the nomenclatures below:

'ingestion_src_api_iq_BTCUSD_1_json', 'ingestion_src_api_iq_BTCUSD_5_json', 'ingestion_src_api_iq_BTCUSD_60_json'

I'm reading all these topics that has the same data structure using the "subscribePattern" param in spark.

 (spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("subscribePattern", "ingestion_src_api.*")
    .option("startingOffsets", "latest")
    .load()
    .select(col("topic").cast("string"), from_json(col("value").cast("string"), schema).alias("value")) 
.   select(to_json(struct(expr("value.active_id as active_id"), expr("value.size as timeframe"),
    expr("cast(value.at / 1000000000 as timestamp) as executed_at"), expr("FROM_UNIXTIME(value.from) as candle_from"), 
    expr("FROM_UNIXTIME(value.to) as candle_to"), expr("value.id as period"), 
    "value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume")).alias("value"))
    .writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_server)
    .option("topic", "processed_src_api_iq_data")
    .option("checkpointLocation", f"./checkpoint/")
    .start()   
    )

How could I write the transformed data into differents topics like:

'processed_src_api_iq_BTCUSD_1_json', 'processed_src_api_iq_BTCUSD_5_json', 'processed_src_api_iq_BTCUSD_60_json'

In my code I am able to write only in one topic "processed_src_api_iq_data".

CodePudding user response:

The outgoing dataframe to format("kafka") can include a String column named topic which will determine where the value and/or key byte/string columns will be produced to, rather than using option, as documented...

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

The topic column is required if the “topic” configuration option is not specified

Use withColumn to add the necessary values, based on the other columns that you have.


Alternatively, create multiple dataframes and call writeStream.format("kafka") with the invidiual option("topic" settings on each.

raw_df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_server)
    .option("subscribePattern", "ingestion_src_api.*")
    .option("startingOffsets", "latest")
    .load()

parsed_df = raw_df.select(col("topic").cast("string"), from_json(col("value").cast("string"), schema).alias("value")) 

processed_df = parsed_df
    .select(to_json(struct(
        expr("value.active_id as active_id"), 
        expr("value.size as timeframe"),
        expr("cast(value.at / 1000000000 as timestamp) as executed_at"), 
        expr("FROM_UNIXTIME(value.from) as candle_from"), 
        expr("FROM_UNIXTIME(value.to) as candle_to"), 
        expr("value.id as period"), 
        "value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume"
    )).alias("value"))

btc_1 = processed_df.filter( ... something to get just this data )
btc_5 = processed_df.filter( ... etc )

btc_1.writeStream.format("kafka")
   .option("topic", "processed_src_api_iq_BTCUSD_1_json")
   ...
btc_5.writeStream.format("kafka")
   .option("topic", "processed_src_api_iq_BTCUSD_5_json")
   ...
  • Related