Home > database >  Save Pandas or Pyspark dataframe from Databricks to Azure Blob Storage
Save Pandas or Pyspark dataframe from Databricks to Azure Blob Storage

Time:12-27

Is there a way I can save a Pyspark or Pandas dataframe from Databricks to a blob storage without mounting or installing libraries?

I was able to achieve this after mounting the storage container into Databricks and using the library com.crealytics.spark.excel, but I was wondering if I can do the same without the library or without mounting because I will be working on clusters without these 2 permissions.

CodePudding user response:

Here the code for saving the dataframe locally to dbfs.

# export 
from os import path

folder = "export"
name = "export"
file_path_name_on_dbfs = path.join("/tmp", folder, name)

# Writing to DBFS
# .coalesce(1) used to generate only 1 file, if the dataframe is too big this won't work so you'll have multiple files qnd you need to copy them later one by one
sampleDF \
  .coalesce(1) \
      .write \
      .mode("overwrite") \
      .option("header", "true") \
      .option("delimiter", ";") \
      .option("encoding", "UTF-8") \
      .csv(file_path_name_on_dbfs)

# path of destination, which will be sent to az storage
dest = file_path_name_on_dbfs   ".csv"

# Renaming part-000...csv to our file name
target_file = list(filter(lambda file: file.name.startswith("part-00000"), dbutils.fs.ls(file_path_name_on_dbfs)))
if len(target_file) > 0:
  dbutils.fs.mv(target_file[0].path, dest)
  dbutils.fs.cp(dest, f"file://{dest}") # this line is added for community edition only cause /dbfs is not recognized, so we copy the file locally
  dbutils.fs.rm(file_path_name_on_dbfs,True)

The code that will send the file into az storage :

import requests

sas="YOUR_SAS_TOKEN_PREVIOUSLY_CREATED" # follow the link below to create SAS token (using sas is slightly more secure than raw key storage) 
blob_account_name = "YOUR_BLOB_ACCOUNT_NAME"
container = "YOUR_CONTAINER_NAME"
destination_path_w_name = "export/export.csv"
url = f"https://{blob_account_name}.blob.core.windows.net/{container}/{destination_path_w_name}?{sas}"

# here we read the content of our previously exported df -> csv
# if you are not on community edition you might want to use /dbfs   dest
payload=open(dest).read()

headers = {
  'x-ms-blob-type': 'BlockBlob',
  'Content-Type': 'text/csv' # you can change the content type according to your needs
}

response = requests.request("PUT", url, headers=headers, data=payload)

# if response.status_code is 201 it means your file was created successfully
print(response.status_code)

Follow this link to setup a SAS token

Remember that anyone who got the sas token can access your storage depending on permissions you set while creating the sas token

Code for Excel export version (using com.crealytics:spark-excel_2.12:0.14.0)

Saving the dataframe :

data = [ 
 ('a',25,'ast'),
 ('b',15,'phone'),
 ('c',32,'dlp'),
 ('d',45,'rare'),
 ('e',60,'phq' )
      ]
colums = ["column1" ,"column2" ,"column3"]
sampleDF = spark.createDataFrame(data=data, schema = colums)
sampleDF.show()

# export 
from os import path
folder = "export"
name = "export"
file_path_name_on_dbfs = path.join("/tmp", folder, name)

# Writing to DBFS
sampleDF.write.format("com.crealytics.spark.excel")\
  .option("header", "true")\
  .mode("overwrite")\
  .save(file_path_name_on_dbfs   ".xlsx")

# excel
dest = file_path_name_on_dbfs   ".xlsx"
dbutils.fs.cp(dest, f"file://{dest}") # this line is added for community edition only cause /dbfs is not recognized, so we copy the file locally

Uploading the file to azure storage :

import requests

sas="YOUR_SAS_TOKEN_PREVIOUSLY_CREATED" # follow the link below to create SAS token (using sas is slightly more secure than raw key storage) 
blob_account_name = "YOUR_BLOB_ACCOUNT_NAME"
container = "YOUR_CONTAINER_NAME"
destination_path_w_name = "export/export.xlsx"
# destination_path_w_name = "export/export.csv"
url = f"https://{blob_account_name}.blob.core.windows.net/{container}/{destination_path_w_name}?{sas}"

# here we read the content of our previously exported df -> csv
# if you are not on community edition you might want to use /dbfs   dest
# payload=open(dest).read()
payload=open(dest, 'rb').read()

headers = {
  'x-ms-blob-type': 'BlockBlob',
#   'Content-Type': 'text/csv'
   'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
}

response = requests.request("PUT", url, headers=headers, data=payload)

# if response.status_code is 201 it means your file was created successfully
print(response.status_code)
  • Related