My knowledge of pyspark is quite limited at this point, so I'm looking for a quick solution to this one issue I have with my current implementation. I'm attempting to read a JSON file via pyspark into a dataframe, convert it into an object which I can insert into a database table (DynamoDB). The columns in the table should be representative of the keys specified in the JSON file. For example, if my JSON file comprises of the following elements:
{
"Records":[
{
"column1":"Value1",
"column2":"Value2",
"column3":"Value3",
"column4":{
"sub1":"Value4",
"sub2":"Value5",
"sub3":{
"sub4":"Value6",
"sub5":"Value7"
}
}
},
{
"column1":"Value8",
"column2":"Value9",
"column3":"Value10",
"column4":{
"sub1":"Value11",
"sub2":"Value12",
"sub3":{
"sub4":"Value13",
"sub5":"Value14"
}
}
}
]
}
The columns in the database table are column1, column2, column3 and column4 respectively. In the case of column4, which is of Map type, I need the entire object to be converted to a string before it is inserted into the database. Hence, in the case of the first row, I can expect to see this for that column:
{
"sub1":"Value4",
"sub2":"Value5",
"sub3":{
"sub4":"Value6",
"sub5":"Value7"
}
}
However, this is what I am seeing in the database table after running my script:
{ Value4, Value5, { Value6, Value7 }}
I understand this is happening because something needs to be done prior to converting all column values to type String before performing the DB insertion operation:
for col in Rows.columns:
Rows = Rows.withColumn(col, Rows[col].cast(StringType()))
I'm looking for a way to rectify the contents of Column4 to represent the original JSON object before converting them to the type String. Here is what I've written so far (DB insertion operation excluded)
import pyspark.sql.types as T
from pyspark.sql import functions as SF
df = spark.read.option("multiline", "true").json('/home/abhishek.tirkey/Documents/test')
Records = df.withColumn("Records", SF.explode(SF.col("Records")))
Rows = Records.select(
"Records.column1",
"Records.column2",
"Records.column3",
"Records.column4",
)
for col in Rows.columns:
Rows = Rows.withColumn(col, Rows[col].cast(StringType()))
RowsJSON = Rows.toJSON()
CodePudding user response:
there is a to_json
function to do that :
from pyspark.sql import functions as F
df = df.withColumn("record", F.explode("records")).select(
"record.column1",
"record.column2",
"record.column3",
F.to_json("record.column4").alias("column4"),
)
df.show()
------- ------- ------- --------------------
|column1|column2|column3| column4|
------- ------- ------- --------------------
| Value1| Value2| Value3|{"sub1":"Value4",...|
| Value8| Value9|Value10|{"sub1":"Value11"...|
------- ------- ------- --------------------
df.printSchema()
root
|-- column1: string (nullable = true)
|-- column2: string (nullable = true)
|-- column3: string (nullable = true)
|-- column4: string (nullable = true)