Home > database >  What is the best way of reading multiple kafka topics in spark streaming?
What is the best way of reading multiple kafka topics in spark streaming?

Time:07-13

I'm developing an application using spark streaming reading from multiple kafka topics and I would like to know if this solution above is the best way of doing that.

bootstrap_server = '10.108.123.238:9092'

topics = [
'ingest_src_api_iq_BTCUSD_1_json', 'ingest_src_api_iq_BTCUSD_5_json', 'ingest_src_api_iq_BTCUSD_60_json', 'ingest_src_api_iq_BTCUSD_300_json', 'ingest_src_api_iq_BTCUSD_900_json', 'ingest_src_api_iq_BTCUSD_1800_json', 'ingest_src_api_iq_BTCUSD_3600_json', 'ingest_src_api_iq_BTCUSD_14400_json', 'ingest_src_api_iq_BTCUSD_86400_json',
'ingest_src_api_iq_XRPUSD_1_json', 'ingest_src_api_iq_XRPUSD_5_json', 'ingest_src_api_iq_XRPUSD_60_json', 'ingest_src_api_iq_XRPUSD_300_json', 'ingest_src_api_iq_XRPUSD_900_json', 'ingest_src_api_iq_XRPUSD_1800_json', 'ingest_src_api_iq_XRPUSD_3600_json', 'ingest_src_api_iq_XRPUSD_14400_json', 'ingest_src_api_iq_XRPUSD_86400_json',
'ingest_src_api_iq_ETHUSD_1_json', 'ingest_src_api_iq_ETHUSD_5_json', 'ingest_src_api_iq_ETHUSD_60_json', 'ingest_src_api_iq_ETHUSD_300_json', 'ingest_src_api_iq_ETHUSD_900_json', 'ingest_src_api_iq_ETHUSD_1800_json', 'ingest_src_api_iq_ETHUSD_3600_json', 'ingest_src_api_iq_ETHUSD_14400_json', 'ingest_src_api_iq_ETHUSD_86400_json',
'ingest_src_api_iq_OMGUSD_1_json', 'ingest_src_api_iq_OMGUSD_5_json', 'ingest_src_api_iq_OMGUSD_60_json', 'ingest_src_api_iq_OMGUSD_300_json', 'ingest_src_api_iq_OMGUSD_900_json', 'ingest_src_api_iq_OMGUSD_1800_json', 'ingest_src_api_iq_OMGUSD_3600_json', 'ingest_src_api_iq_OMGUSD_14400_json', 'ingest_src_api_iq_OMGUSD_86400_json',
'ingest_src_api_iq_TRXUSD_1_json', 'ingest_src_api_iq_TRXUSD_5_json', 'ingest_src_api_iq_TRXUSD_60_json', 'ingest_src_api_iq_TRXUSD_300_json', 'ingest_src_api_iq_TRXUSD_900_json', 'ingest_src_api_iq_TRXUSD_1800_json', 'ingest_src_api_iq_TRXUSD_3600_json', 'ingest_src_api_iq_TRXUSD_14400_json', 'ingest_src_api_iq_TRXUSD_86400_json',
]

schema = "active_id INT, size INT, at STRING, from STRING, to STRING, id INT, open FLOAT, close FLOAT, min FLOAT, max FLOAT, ask FLOAT, bid FLOAT, volume FLOAT, phase STRING"


for topic in topics:
    df = (spark.readStream.format("kafka")
                        .option("kafka.bootstrap.servers", bootstrap_server)
                        .option("subscribe", topic)
                        .option("startingOffsets", "latest")
                        .load()
                        .select(from_json(col("value").cast("string"), schema).alias("value")) 
                        )

    df = (df.select(to_json(struct(expr("value.active_id as active_id"), expr("value.size as timeframe"),
                        expr("cast(value.at / 1000000000 as timestamp) as executed_at"), expr("FROM_UNIXTIME(value.from) as candle_from"), 
                        expr("FROM_UNIXTIME(value.to) as candle_to"), expr("value.id as period"), 
                        "value.open", "value.close", "value.min", "value.max", "value.ask", "value.bid", "value.volume")).alias("value"))
                        .writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrap_server)
                        .option("topic", topic.replace('ingest', 'processed'))
                        .option("checkpointLocation", F"./checkpoint_{topic.replace('ingest', 'processed')}/")
                        .start()   
                        )

I'm using a for loop to read all the Kafka topics. Is this be the best option in terms of processing and memory usage ?

CodePudding user response:

Since the processing logic in your example is the same for all topics, you can replace

.option("subscribe", topic)

with

.option("subscribePattern", "ingest_src_api.*")

Behind the scene, this will create one single (distributed) Kafka consumer group for all partitions of all those topics, followed by one single Spark lineage, as opposed to one consumer group per topic as your code is currently doing.

If new topics are later created that match that regex pattern, they should be automatically detected as well, after some delay: the check is done every metadata.max.age.ms, which is 5min by default, and you can override it with .option("kafka.metadata.max.age.ms", "3000") or so.

  • Related