Home > Mobile >  Unable to replace a particular value from an array column in Pyspark
Unable to replace a particular value from an array column in Pyspark

Time:10-26

I have a column in my DF where data type is :

testcolumn:array  
--element: struct
-----id:integer   
-----configName: string 
-----desc:string  
-----configparam:array
--------element:map  
-------------key:string
-------------value:string 

testcolumn
Row1:

[{"id":1,"configName":"test1","desc":"Ram1","configparam":[{"removeit":"[]"}]},
{"id":2,"configName":"test2","desc":"Ram2","configparam":[{"removeit":"[]"}]},
{"id":3,"configName":"test3","desc":"Ram1","configparam":[{"paramId":"4","paramvalue":"200"}]}]    

Row2:

[{"id":11,"configName":"test11","desc":"Ram11","configparam":[{"removeit":"[]"}]},
{"id":33,"configName":"test33","desc":"Ram33","configparam":[{"paramId":"43","paramvalue":"300"}]},
{"id":6,"configName":"test26","desc":"Ram26","configparam":[{"removeit":"[]"}]},
{"id":93,"configName":"test93","desc":"Ram93","configparam":[{"paramId":"93","paramvalue":"3009"}]}
]    

I want to remove where configparam is "configparam":[{"removeit":"[]"}] to "configparam":[]

expecting output:
outputcolumn

Row1:

[{"id":1,"configName":"test1","desc":"Ram1","configparam":[]},
{"id":2,"configName":"test2","desc":"Ram2","configparam":[]},
{"id":3,"configName":"test3","desc":"Ram1","configparam":[{"paramId":"4","paramvalue":"200"}]}]    

Row2:

[{"id":11,"configName":"test11","desc":"Ram11","configparam":[]},
{"id":33,"configName":"test33","desc":"Ram33","configparam":[{"paramId":"43","paramvalue":"300"}]},
{"id":6,"configName":"test26","desc":"Ram26","configparam":[]},
{"id":93,"configName":"test93","desc":"Ram93","configparam":[{"paramId":"93","paramvalue":"3009"}]}
]   

I have tried this code but it is not giving me output:

test=df.withColumn('outputcolumn',F.expr("translate"(testcolumn,x-> replace(x,':[{"removeit":"[]"}]','[]'))) 

it will be really great if someone can help me.

CodePudding user response:

Your testcolumn is an array of struct so you cannot do a string operation as it is.

You can do something like this. This will empty configparam completely when it contains a key "removeit".

Example:

"configparam":[{"removeit":[], "otherparam": "value"}] -> "configparam": []
array_has_remove = lambda y: ~F.array_contains(F.map_keys(y), 'removeit')

df = (df.withColumn('outputcolumn', 
          F.transform('testcolumn', 
              lambda x: x.withField('configparam', 
                  F.filter(x['configparam'], array_has_remove)
              )
          )
     ))

Ref: withField, filter, array_contains, map_keys

CodePudding user response:

You have to perform a chain of explode, filter and groupBy operations to achieve this.

First, explode array/struct/map columns to reach to the nested column:

df = df.withColumn("id", F.col("testcolumn")["id"])
df = df.withColumn("configName", F.col("testcolumn")["configName"])
df = df.withColumn("desc", F.col("testcolumn")["desc"])
df = df.withColumn("configparam_exploded", F.explode(F.col("testcolumn")["configparam"]))
df = df.select(df.columns   [F.explode(F.col("configparam_exploded"))])

 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 
|testcolumn                                           |id |configName|desc|configparam_exploded             |key       |value|
 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 
|{1, test1, Ram1, [{removeit -> []}]}                 |1  |test1     |Ram1|{removeit -> []}                 |removeit  |[]   |
|{2, test2, Ram2, [{removeit -> []}]}                 |2  |test2     |Ram2|{removeit -> []}                 |removeit  |[]   |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramId   |4    |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramvalue|200  |
 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 

Then, filter data as required:

df = df.filter((F.col("key") != "removeit") | (F.col("value") != "[]"))

 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 
|testcolumn                                           |id |configName|desc|configparam_exploded             |key       |value|
 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramId   |4    |
|{3, test3, Ram1, [{paramId -> 4, paramvalue -> 200}]}|3  |test3     |Ram1|{paramId -> 4, paramvalue -> 200}|paramvalue|200  |
 ----------------------------------------------------- --- ---------- ---- --------------------------------- ---------- ----- 

Finally, groupBy all individual columns back to original packing:

df = df.withColumn("configparam_map", F.map_from_entries(F.array(F.struct("key", "value"))))
df = df.groupBy(["id", "configName", "desc"]).agg(F.collect_list("configparam_map").alias("configparam"))
df = df.withColumn("testcolumn", F.struct("id", "configName", "desc", "configparam"))
df = df.drop("id", "configName", "desc", "configparam")

 ------------------------------------------------------- 
|testcolumn                                             |
 ------------------------------------------------------- 
|{3, test3, Ram1, [{paramId -> 4}, {paramvalue -> 200}]}|
 ------------------------------------------------------- 

Sample dataset used to reproduce the problem:

schema = StructType([StructField('testcolumn', StructType([StructField('id', IntegerType(), True), StructField('configName', StringType(), True), StructField('desc', StringType(), True), StructField('configparam', ArrayType(MapType(StringType(), StringType(), True), True), True)]), True)])

data = [
  Row(Row(1, "test1", "Ram1", [{"removeit":"[]"}])),
  Row(Row(2, "test2", "Ram2", [{"removeit":"[]"}])),
  Row(Row(3, "test3", "Ram1", [{"paramId":"4","paramvalue":"200"}]))    
]

df = spark.createDataFrame(data = data, schema = schema)
  • Related