I've a pyspark dataframe that contains 3 columns:
- databricksPath
- countryPartition
- yearPartition
I'm creating this dataframe based on values coming from widgets via Data Factory: https://i.stack.imgur.com/8zIuO.png
pyspark dataframe: https://i.stack.imgur.com/ZcjZO.png
With this dataframe I wanted to create an output with all the combinations in which I have, for example, a json structure to send as output to the ADF for example with this command (dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition}))
and be able to use it in foreach activity
Output example:
"output": {
"value": [
{
"country": "PT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "ES",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "IT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "BE",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "PT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "ES",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "IT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "BE",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
}
]
}
Notebook that I'm using:
# Databricks notebook source
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, row_number, instr, expr, when, current_date, months_between, coalesce, concat_ws, sum as Sum, first, round, monotonically_increasing_id, date_format, concat, substring, count
from pyspark.sql.window import Window
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame
import traceback
import pyodbc
import uuid
import sys
# COMMAND ----------
dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
# COMMAND ----------
from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField('databricksPath', StringType(), True),
StructField('countryPartition', StringType(), True),
StructField('yearPartition', StringType(), True)
])
data2 = [(databricksPath,countryPartition,yearPartition)]
df = spark.createDataFrame(data=data2,schema=schema)
df2 = df.withColumn("databricksPath", concat_ws(",",col("databricksPath")))
display(df2)
# COMMAND ----------
dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})
Can anyone please help me in achieving this
Thank you!
CodePudding user response:
You can use the following code to achieve this:
dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
#dbutils.widgets.text("partitionColumn", "['dbo.table1|country', 'dbo.table2|country_year']", "partitionColumn")
databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
#partitionColumn = dbutils.widgets.get('partitionColumn')
#creating seperate dataframe for each of the above.
path_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('databricksPath'))],schema=['path'])
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
#p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])
#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
#final_df= broadcast(broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)).crossJoin(p_df)
#from pyspark.sql.functions import split
#fdf = final_df.select('country','year','path',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partition'))
#from pyspark.sql.functions import array
#fdf = fdf.withColumn('countryYear', array(col('country'),col('year')))
#get the result dataframe as a dictionary
output = [eval(i) for i in final_df.toJSON().collect()]
#output = [eval(i) for i in fdf.toJSON().collect()]
#returning the above output dictionary/JSON to data factory
import json
dbutils.notebook.exit(json.dumps(output))
- Using the code, the value of
output
will be array of objects (like the output example)
- When I run this notebook with a notebook activity in azure data factory, it gives the following result:
UPDATE: This is the output image for the updated requirement.