Home > Back-end >  Pyspark - Append two json structures
Pyspark - Append two json structures


I'm creating a databricks notebook in pyspark, and I'm trying to create a dynamic json (TSML) to process a cube in analysis service.

In the first part of the code I'm defining the partitions and the fact tables that then independently of the partitions that I place, it dynamically will create a json and is creating well: enter image description here enter image description here enter image description here

json output 1: {"table": "Analysis Financial", "partition": "IT_2022"}, {"table": "Analysis Financial", "partition": "ES_2022"}, {"table": "Analysis Financial", "partition": "BE_2022"}, {"table": "Analysis Financial", "partition": "PT_2022"}, {"table": "Analysis Product", "partition": "IT"}, {"table": "Analysis Product", "partition": "ES"}, {"table": "Analysis Product", "partition": "PT"}, {"table": "Analysis Product", "partition": "BE"}

Then I want to append with another json that contains the dimension tables, and this part will be static because the tables do not need to dynamically process by partition:

enter image description here

And, to append both I tried this: enter image description here

but the output is not correct:

{"MaxParallelism": 1, "Objects": [{"table": "Analysis Measures"}, {"table": "Bank"}, {"table": "Channel"}], "RetryCount": 2, "Type": "Full"}, {"partition": "IT_2022", "table": "Analysis Financial"}, {"partition": "ES_2022", "table": "Analysis Financial"}, {"partition": "BE_2022", "table": "Analysis Financial"}, {"partition": "PT_2022", "table": "Analysis Financial"}, {"partition": "IT", "table": "Analysis Product"}, {"partition": "ES", "table": "Analysis Product"}, {"partition": "PT", "table": "Analysis Product"}, {"partition": "BE", "table": "Analysis Product"}

The output I would want in this case is:

         "table":"Analysis Measures"
         "table":"Analysis Financial",
         "table":"Analysis Financial",
         "table":"Analysis Financial",
         "table":"Analysis Financial",
         "table":"Analysis Product",
         "table":"Analysis Product",
         "table":"Analysis Product",
         "table":"Analysis Product",

Notebook that I'm using:

# Databricks notebook source
from pyspark.sql.types import * 
from pyspark.sql.functions import * 
# from pyspark.sql.functions.sequence import *
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pathlib import Path
from functools import reduce
import traceback
import pyodbc
import uuid
import sys

spark.conf.set("spark.databricks.delta.targetFileSize", 33554432)
spark.conf.set("spark.databricks.io.cache.enabled", "true")

# COMMAND ----------

dbutils.widgets.text("cubeName", "Consumer", "cubeName")
dbutils.widgets.text("countryPartition",  '["PT","IT","ES","BE"]', "countryPartition")
dbutils.widgets.text("yearPartition",  '["2022"]', "yearPartition")
dbutils.widgets.text("partitionColumn", '["Analysis Financial|country_year","Analysis Product|country"]', "partitionColumn")

cubeName = dbutils.widgets.get('cubeName')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
partitionColumn = dbutils.widgets.get('partitionColumn')


# COMMAND ----------

cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
# p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])

p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])

#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
# final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(p_df)

from pyspark.sql.functions import split

fdf = final_df.select('country','year',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partitionColumn')).distinct()

fdf = fdf.withColumn('partition', when(col('partitionColumn') == "country", lit(col('country'))).otherwise(concat(col('country'), lit('_'), col('year'))))

fdf = fdf.select('table', 'partition').distinct()
# # display(fdf)

output = [eval(i) for i in fdf.toJSON().collect()]

import json

# COMMAND ----------

b = {
    "Type": "Full",
    "MaxParallelism": 1,
    "RetryCount": 2,
    "Objects": [
        "table": "Analysis Measures"
        "table": "Bank"
        "table": "Channel"

# COMMAND ----------

json_content1 = b
json_content2 = output

json_list = []

df = spark.read.json(sc.parallelize(json_list))
# display(df)

df = [eval(i) for i in df.toJSON().collect()]

import json

Can anyone please help me in achieving this?

Thank you!

CodePudding user response:

Please check this code for Append two JSON structures and using ['Objects'].extend you will get required output .

json_content1 = b
json_content2 = output




enter image description here

  • Related