I'm trying send data to my Data Lake with a While Loop.
Basically, the intention is to continually loop through code and send data to my Data Lake when ever data received from my Azure Service Bus using the following code:
This code receives message from my Service Bus
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
This code assigns a variable to the message:
result = myfunc()
The following code sends the message to my data lake
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
I would like help looping through the code to continually checking for messages and sending the results to my data lake.
I believe the solution is accomplished with a While Loop but not sure
CodePudding user response:
Just because you're using Spark doesn't mean you cannot loop
First off all, you're only returning the first message from your receiver, so it should look like this
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
msg = str(next(receiver))
# print("Received: " msg)
themsg = json.loads(msg)
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
To answer your question,
while True:
result = json.dumps(myfunc())
rdd = sc.parallelize([result])
spark.read.json(rdd) \ # You should use rdd.toDF().json here instead
.write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
Keep in mind that the output file names aren't consistent and you might not want them to be overwritten
Alternatively, you should look into writing your own Source
/ SparkDataStream
class that defines SparkSQL sources so that you don't need a loop in your main method and it's natively handled by Spark