Home > Blockchain >  How to seperate CSV values with in a cell in pyspark dataframe to new columns and their values respe
How to seperate CSV values with in a cell in pyspark dataframe to new columns and their values respe

Time:10-06

The current spark data frame has CSV values in cell level of one column, I trying to explode it to new columns. The example dataframe

    a_id                                    features
1   2020     "a","b","c","d","constant1","1","0.1","aa"
2   2021     "a","b","c","d","constant2","1","0.2","ab"
3   2022     "a","b","c","d","constant3","1","0.3","ac","a","b","c","d","constant3","1.1","3.3","acx"
4   2023     "a","b","c","d","constant4","1","0.4","ad"
5   2024     "a","b","c","d","constant5","1","0.5","ae","a","b","c","d","constant5","1.2","6.3","xwy","a","b","c","d","constant5","2.2","8.3","bunr"
6   2025     "a","b","c","d","constant6","1","0.6","af"

The features column has multiple csv values, and in it (a, b, c, d) act as header and they get repeated in some cells (in row 3 and 5), I want to extract only one header and its respective values. The output of the expected dataframe is as shown

Output spark dataframe

    a_id       a        d
1   2020   constant1   ["aa"]
2   2021   constant2   ["ab"]
3   2022   constant3   ["ac","acx"]
4   2023   constant4   ["ad"]
5   2024   constant5   ["ae","xwy","bunr"]
6   2025   constant6   ["af"]

As shown, I would like to extract only a and d headers as new columns, where a is constant and d has multiple values, where its values made as list.

Please help how to convert this in pysaprk. The above dataframe is streaming dataframe in real time.

CodePudding user response:

Using only Pyspark/Spark SQL functions:

  • remove the headers from the string
  • extract the substrings using regexp_extract_all, breaking the string into substrings after each fourth ,
  • explode the result and remove empty lines
  • split the result again. Now each csv value is an element of an array
  • create columns a and d from the first and fourth element of the array
  • group by a_id
from pyspark.sql import functions as F

header='"a","b","c","d",'
num_headers = header.count(",")

df.withColumn("features", F.expr(f"replace(features, '{header}')")) \
  .withColumn("features", F.expr(f"regexp_extract_all(features, '(([^,]*,?)\\{{{num_headers}}})')")) \
  .withColumn("features", F.explode("features"))\
  .filter("not features =''") \
  .withColumn("features", F.split("features", ",")) \
  .withColumn("a", F.expr("features[0]")) \
  .withColumn("d", F.expr("features[3]")) \
  .groupBy("a_id") \
  .agg(F.first("a").alias("a"), F.collect_list("d").alias("d")) \
  .show(truncate=False)

Output:

 ---- ---------- --------------------- 
|a_id|a         |d                    |
 ---- ---------- --------------------- 
|2020|"constant"|["aa"]               |
|2022|"constant"|["ac", "acx"]        |
|2025|"constant"|["af"]               |
|2023|"constant"|["ad"]               |
|2021|"constant"|["ab"]               |
|2024|"constant"|["ae", "xwy", "bunr"]|
 ---- ---------- --------------------- 
  • Related