Home > Enterprise >  Pyspark - Create a json structure with all combinations based on a dataframe
Pyspark - Create a json structure with all combinations based on a dataframe

Time:09-15

I've a pyspark dataframe that contains 3 columns:

  • databricksPath
  • countryPartition
  • yearPartition

I'm creating this dataframe based on values coming from widgets via Data Factory: https://i.stack.imgur.com/8zIuO.png

pyspark dataframe: https://i.stack.imgur.com/ZcjZO.png

With this dataframe I wanted to create an output with all the combinations in which I have, for example, a json structure to send as output to the ADF for example with this command (dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})) and be able to use it in foreach activity

Output example:

"output": {
                "value": [
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_1"
                    },
                    {
                        "country": "PT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "ES",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "IT",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    },
                    {
                        "country": "BE",
                        "year": "2022",
                        "databricksPath": "/notebooks/1.Project/Notebook_2"
                    }
                    ]
                    } 

Notebook that I'm using:

# Databricks notebook source
from pyspark.sql import functions as F 
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, row_number, instr, expr, when, current_date, months_between, coalesce, concat_ws, sum as Sum, first, round, monotonically_increasing_id, date_format, concat, substring, count
from pyspark.sql.window import Window
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame
import traceback
import pyodbc
import uuid
import sys


# COMMAND ----------

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")


databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')

# COMMAND ----------

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
  StructField('databricksPath', StringType(), True),
  StructField('countryPartition', StringType(), True),
  StructField('yearPartition', StringType(), True)
  ])

data2 = [(databricksPath,countryPartition,yearPartition)]
df = spark.createDataFrame(data=data2,schema=schema)

df2 = df.withColumn("databricksPath", concat_ws(",",col("databricksPath")))

display(df2)

# COMMAND ----------

dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})

Can anyone please help me in achieving this

Thank you!

CodePudding user response:

You can use the following code to achieve this:

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
#dbutils.widgets.text("partitionColumn", "['dbo.table1|country', 'dbo.table2|country_year']", "partitionColumn")

databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
#partitionColumn = dbutils.widgets.get('partitionColumn')

#creating seperate dataframe for each of the above.
path_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('databricksPath'))],schema=['path'])
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
#p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])


#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
#final_df= broadcast(broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)).crossJoin(p_df)

#from pyspark.sql.functions import split
#fdf = final_df.select('country','year','path',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partition'))

#from pyspark.sql.functions import array
#fdf = fdf.withColumn('countryYear', array(col('country'),col('year')))

#get the result dataframe as a dictionary
output = [eval(i) for i in final_df.toJSON().collect()]
#output = [eval(i) for i in fdf.toJSON().collect()]

#returning the above output dictionary/JSON to data factory
import json
dbutils.notebook.exit(json.dumps(output))
  • Using the code, the value of output will be array of objects (like the output example)

enter image description here

  • When I run this notebook with a notebook activity in azure data factory, it gives the following result:

enter image description here

UPDATE: This is the output image for the updated requirement.

  • Related