I have some kafkas topics with the nomenclatures below:
'ingestion_src_api_iq_BTCUSD_1_json', 'ingestion_src_api_iq_BTCUSD_5_json', 'ingestion_src_api_iq_BTCUSD_60_json'
I'm reading all these topics that has the same data structure using the "subscribePattern"
param in spark.
(spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server)
.option("subscribePattern", "ingestion_src_api.*")
.option("startingOffsets", "latest")
.load()
.select(col("topic").cast("string"), from_json(col("value").cast("string"), schema).alias("value"))
. select(to_json(struct(expr("value.active_id as active_id"), expr("value.size as timeframe"),
expr("cast(value.at / 1000000000 as timestamp) as executed_at"), expr("FROM_UNIXTIME(value.from) as candle_from"),
expr("FROM_UNIXTIME(value.to) as candle_to"), expr("value.id as period"),
"value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume")).alias("value"))
.writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_server)
.option("topic", "processed_src_api_iq_data")
.option("checkpointLocation", f"./checkpoint/")
.start()
)
How could I write the transformed data into differents topics like:
'processed_src_api_iq_BTCUSD_1_json', 'processed_src_api_iq_BTCUSD_5_json', 'processed_src_api_iq_BTCUSD_60_json'
In my code I am able to write only in one topic "processed_src_api_iq_data"
.
CodePudding user response:
The outgoing dataframe to format("kafka")
can include a String column named topic
which will determine where the value
and/or key
byte/string columns will be produced to, rather than using option
, as documented...
The
topic
column is required if the“topic”
configuration option is not specified
Use withColumn
to add the necessary values, based on the other columns that you have.
Alternatively, create multiple dataframes and call writeStream.format("kafka")
with the invidiual option("topic"
settings on each.
raw_df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server)
.option("subscribePattern", "ingestion_src_api.*")
.option("startingOffsets", "latest")
.load()
parsed_df = raw_df.select(col("topic").cast("string"), from_json(col("value").cast("string"), schema).alias("value"))
processed_df = parsed_df
.select(to_json(struct(
expr("value.active_id as active_id"),
expr("value.size as timeframe"),
expr("cast(value.at / 1000000000 as timestamp) as executed_at"),
expr("FROM_UNIXTIME(value.from) as candle_from"),
expr("FROM_UNIXTIME(value.to) as candle_to"),
expr("value.id as period"),
"value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume"
)).alias("value"))
btc_1 = processed_df.filter( ... something to get just this data )
btc_5 = processed_df.filter( ... etc )
btc_1.writeStream.format("kafka")
.option("topic", "processed_src_api_iq_BTCUSD_1_json")
...
btc_5.writeStream.format("kafka")
.option("topic", "processed_src_api_iq_BTCUSD_5_json")
...