Home > Mobile >  Databricks Apache Spark AttributeError: 'dict' object has no attribute 'write'
Databricks Apache Spark AttributeError: 'dict' object has no attribute 'write'

Time:02-15

The following function results in outputting JSON results.

def myfunc():
  with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
      # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
      # Default is None; to receive forever.

        with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
          for msg in receiver:
              # print("Received: "   str(msg))
              themsg = json.loads(str(msg))
              # complete the message so that the message is removed from the queue
              receiver.complete_message(msg)
              return themsg

result = myfunc()

The following is snippet of the JSON output.

Out[65]: {'name': 'dmMapping_DIM_WORK_ORDER',
 'description': 'DIM_WORK_ORDER Azure Foundation to Azure Data Mart Mapping',
 'version': '2.4',
 'updateDttm': '01/02/2022 14:46PM',
 'SCDType': 4,
 'mappings': [{'ELLIPSE': {'method': 'ellipseItem',
    'tables': [{'database': 'foundation',
      'schema': 'AZ_FH_ELLIPSE',
      'table': 'ADS_FND_MSF620',
      'primaryKey': [{'column': 'WORK_ORDER'}]}],
    'columns': [{'column': 'D_WORK_ORDER_KEY',
      'type': 'int',
      'allowNulls': 'No',
      'mapType': 'autoGenerate'},
     {'column': 'SYSTEM_OF_RECORD',
      'type': 'varchar',
      'length': 24,
      'allowNulls': 'No',
      'mapType': 'staticValue',
      'value': 'ELLIPSE'},
     {'column': 'ACTUAL_FINISH_DATE',

When I attempt to save the output with following

result.write.save().json('/mnt/lake/RAW/FormulaClassification/F1Area/')

I get the error:

AttributeError: 'dict' object has no attribute 'write'

Can someone let me know how overcome this error?

CodePudding user response:

The simplest way is just write data as JSON, without using Spark:

with open("/dbfs/mnt/lake/RAW/FormulaClassification/F1Area/<file-name>", "w") as file:
  file.write(json.dumps(result))

You can still use Spark API, but for one message it would be the overkill:

rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
  .write.mode("append").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
  • Related