I have a PySpark dataframe column comprised of multiple addresses. The format is as below:
id addresses
1 [{"city":null,"state":null,"street":"123, ABC St, ABC Square","postalCode":"11111","country":"USA"},{"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"}]
I want to transform it as below:
id | city | state | street | postalCode | country |
---|---|---|---|---|---|
1 | null | null | 123, ABC St, ABC Square | 11111 | USA |
1 | Dallas | TX | 456, DEF Plaza, Test St | 99999 | USA |
Any inputs on how to achieve this using PySpark? The dataset is huge (several TBs) so want to do this in an efficient way.
I tried splitting the address string on comma however since there are commas within the addresses as well, the output is not as expected. I guess I need to use a regular expression pattern with the braces but not sure how. Moreover, how do I go about denormalizing the data?
CodePudding user response:
#Data
from pyspark.sql.functions import *
df =spark.createDataFrame([(1,'{"city":"New York","state":"NY","street":"123, ABC St, ABC Square","postalCode":"11111","country":"USA"},{"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"}')],
('id','addresses'))
df.show(truncate=False)
#pass the string column to rdd to extracr schema
rdd=df.select(col("addresses").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
newschema =spark.read.json(rdd).schema
##Apply schema to string column reading using from_schema
df3=df.select("*",from_json("addresses", newschema).alias("test_col"))#Assign schema to column using select
df3.select('id','test_col.*').show()
--- -------- ------- ---------- ----- ------------------------
|id |city |country|postalCode|state|street |
--- -------- ------- ---------- ----- ------------------------
|1 |New York|USA |11111 |NY |123, ABC St, ABC Square|
--- -------- ------- ---------- ----- ------------------------
CodePudding user response:
This is what worked for me. Thanks to @wwnde for their solution! I had to tweak it a bit as I found out that Spark 3.0 cannot parse JSON arrays as structs link
@wwnde: I want to upvote your answer but unfortunately SO isn't allowing me as I haven't earned enough reputation yet.
newschema = ArrayType(StructType([StructField("city",StringType(),True),StructField("state",StringType(),True),StructField("street",StringType(),True),StructField("postalCode",StringType(),True),StructField("country",StringType(),True)]))
dpDF_transformed = dpDF.select("*",from_json("addresses", newschema).alias("addresses_transformed"))
dpDF_final = dpDF_transformed.withColumn("addl_addr_city",regexp_replace(regexp_replace("addresses_transformed.city","\\[",""),"\\]","")) \
.withColumn("addl_addr_state",regexp_replace(regexp_replace("addresses_transformed.state","\\[",""),"\\]","")) \
.withColumn("addl_addr_street",regexp_replace(regexp_replace("addresses_transformed.street","\\[",""),"\\]","")) \
.withColumn("addl_addr_postalCode",regexp_replace(regexp_replace("addresses_transformed.postalCode","\\[",""),"\\]","")) \
.withColumn("addl_addr_country",regexp_replace(regexp_replace("addresses_transformed.country","\\[",""),"\\]",""))
dpDF_final.select("id","addl_addr_city","addl_addr_state","addl_addr_street","addl_addr_postalCode","addl_addr_country").show(5,False)