The following function results in outputting JSON results.
def myfunc():
with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
for msg in receiver:
# print("Received: " str(msg))
themsg = json.loads(str(msg))
# complete the message so that the message is removed from the queue
receiver.complete_message(msg)
return themsg
result = myfunc()
The following is snippet of the JSON output.
Out[65]: {'name': 'dmMapping_DIM_WORK_ORDER',
'description': 'DIM_WORK_ORDER Azure Foundation to Azure Data Mart Mapping',
'version': '2.4',
'updateDttm': '01/02/2022 14:46PM',
'SCDType': 4,
'mappings': [{'ELLIPSE': {'method': 'ellipseItem',
'tables': [{'database': 'foundation',
'schema': 'AZ_FH_ELLIPSE',
'table': 'ADS_FND_MSF620',
'primaryKey': [{'column': 'WORK_ORDER'}]}],
'columns': [{'column': 'D_WORK_ORDER_KEY',
'type': 'int',
'allowNulls': 'No',
'mapType': 'autoGenerate'},
{'column': 'SYSTEM_OF_RECORD',
'type': 'varchar',
'length': 24,
'allowNulls': 'No',
'mapType': 'staticValue',
'value': 'ELLIPSE'},
{'column': 'ACTUAL_FINISH_DATE',
When I attempt to save the output with following
result.write.save().json('/mnt/lake/RAW/FormulaClassification/F1Area/')
I get the error:
AttributeError: 'dict' object has no attribute 'write'
Can someone let me know how overcome this error?
CodePudding user response:
The simplest way is just write data as JSON, without using Spark:
with open("/dbfs/mnt/lake/RAW/FormulaClassification/F1Area/<file-name>", "w") as file:
file.write(json.dumps(result))
You can still use Spark API, but for one message it would be the overkill:
rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
.write.mode("append").json('/mnt/lake/RAW/FormulaClassification/F1Area/')