Home > Enterprise >  Split Complex String in PySpark Dataframe Column
Split Complex String in PySpark Dataframe Column

Time:11-22

I have a PySpark dataframe column comprised of multiple addresses. The format is as below:

id       addresses
1       [{"city":null,"state":null,"street":"123, ABC St, ABC  Square","postalCode":"11111","country":"USA"},{"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"}]

I want to transform it as below:

id city state street postalCode country
1 null null 123, ABC St, ABC Square 11111 USA
1 Dallas TX 456, DEF Plaza, Test St 99999 USA

Any inputs on how to achieve this using PySpark? The dataset is huge (several TBs) so want to do this in an efficient way.

I tried splitting the address string on comma however since there are commas within the addresses as well, the output is not as expected. I guess I need to use a regular expression pattern with the braces but not sure how. Moreover, how do I go about denormalizing the data?

CodePudding user response:

#Data

from pyspark.sql.functions import *
df =spark.createDataFrame([(1,'{"city":"New York","state":"NY","street":"123, ABC St, ABC  Square","postalCode":"11111","country":"USA"},{"city":"Dallas","state":"TX","street":"456, DEF Plaza, Test St","postalCode":"99999","country":"USA"}')],
                         ('id','addresses'))
df.show(truncate=False)

#pass the string column to rdd to extracr schema
rdd=df.select(col("addresses").alias("jsoncol")).rdd.map(lambda x: x.jsoncol)
newschema =spark.read.json(rdd).schema

##Apply schema to string column reading using from_schema
df3=df.select("*",from_json("addresses", newschema).alias("test_col"))#Assign schema to column using select

df3.select('id','test_col.*').show()

 --- -------- ------- ---------- ----- ------------------------ 
|id |city    |country|postalCode|state|street                  |
 --- -------- ------- ---------- ----- ------------------------ 
|1  |New York|USA    |11111     |NY   |123, ABC St, ABC  Square|
 --- -------- ------- ---------- ----- ------------------------ 

CodePudding user response:

This is what worked for me. Thanks to @wwnde for their solution! I had to tweak it a bit as I found out that Spark 3.0 cannot parse JSON arrays as structs link

@wwnde: I want to upvote your answer but unfortunately SO isn't allowing me as I haven't earned enough reputation yet.

newschema = ArrayType(StructType([StructField("city",StringType(),True),StructField("state",StringType(),True),StructField("street",StringType(),True),StructField("postalCode",StringType(),True),StructField("country",StringType(),True)]))

        dpDF_transformed = dpDF.select("*",from_json("addresses", newschema).alias("addresses_transformed"))
       
        dpDF_final = dpDF_transformed.withColumn("addl_addr_city",regexp_replace(regexp_replace("addresses_transformed.city","\\[",""),"\\]",""))  \
                 .withColumn("addl_addr_state",regexp_replace(regexp_replace("addresses_transformed.state","\\[",""),"\\]","")) \
                 .withColumn("addl_addr_street",regexp_replace(regexp_replace("addresses_transformed.street","\\[",""),"\\]","")) \
                 .withColumn("addl_addr_postalCode",regexp_replace(regexp_replace("addresses_transformed.postalCode","\\[",""),"\\]","")) \
                 .withColumn("addl_addr_country",regexp_replace(regexp_replace("addresses_transformed.country","\\[",""),"\\]",""))
        
        dpDF_final.select("id","addl_addr_city","addl_addr_state","addl_addr_street","addl_addr_postalCode","addl_addr_country").show(5,False)
  • Related