Home > Software design >  How to Compile a While Loop statement in PySpark on Apache Spark with Databricks
How to Compile a While Loop statement in PySpark on Apache Spark with Databricks

Time:02-27

I'm trying send data to my Data Lake with a While Loop.

Basically, the intention is to continually loop through code and send data to my Data Lake when ever data received from my Azure Service Bus using the following code:

This code receives message from my Service Bus

def myfunc():
  with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
      # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
      # Default is None; to receive forever.

        with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
          for msg in receiver:
              # print("Received: "   str(msg))
              themsg = json.loads(str(msg))
              # complete the message so that the message is removed from the queue
              receiver.complete_message(msg)
              return themsg

This code assigns a variable to the message:

result = myfunc()

The following code sends the message to my data lake

rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
  .write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')

I would like help looping through the code to continually checking for messages and sending the results to my data lake.

I believe the solution is accomplished with a While Loop but not sure

CodePudding user response:

Just because you're using Spark doesn't mean you cannot loop

First off all, you're only returning the first message from your receiver, so it should look like this

with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
    msg = str(next(receiver)) 
          
    # print("Received: "   msg)
    themsg = json.loads(msg)
    # complete the message so that the message is removed from the queue
              
    receiver.complete_message(msg)
    return themsg 

To answer your question,

while True:
    result = json.dumps(myfunc())

    rdd = sc.parallelize([result])
    spark.read.json(rdd) \  # You should use rdd.toDF().json here instead 
      .write.mode("overwrite").json('/mnt/lake/RAW/FormulaClassification/F1Area/')

Keep in mind that the output file names aren't consistent and you might not want them to be overwritten

Alternatively, you should look into writing your own Source / SparkDataStream class that defines SparkSQL sources so that you don't need a loop in your main method and it's natively handled by Spark

  • Related