Home > database >  Pyspark : Split dataframe based on contents and extract date from bottom line of split
Pyspark : Split dataframe based on contents and extract date from bottom line of split

Time:12-09

I am reading a legacy file into a Dataframe and it looks something like below;

 ----------- ---------- ---------- -------- -------- -------- 
|c1         |      c2  | c3       |    c4  |    c5  |    c6  |
 ----------- ---------- ---------- -------- -------- -------- 
| 01        |  B01     |null      |null    |file1   |B01-01  |
| 06        |  B01     |foo       |bar     |file1   |B01-02  |
| 06        |  B01     |foo       |bar     |file1   |B01-03  |
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |
| 01        |  B02     |null      |null    |file2   |B02-01  |
| 09        |  B02     |2021-12-07|null    |file2   |B02-02  |
| 01        |  B03     |null      |null    |file3   |B03-01  |
| 06        |  B03     |foo       |bar     |file3   |B03-02  |
| 06        |  B03     |foo       |bar     |file3   |B03-03  |
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |
| 01        |  B01     |null      |null    |file4   |B01-01  |
| 06        |  B01     |foo       |bar     |file4   |B01-02  |
| 06        |  B01     |foo       |bar     |file4   |B01-03  |
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |
 ----------- ---------- ---------- -------- -------- -------- 

One physical file contains multiple logical files in it and has a header (01), detail_rec (06) and trailer (09) (sometimes only header and trailer)

I want to take the date from the trailer for every logical separation and add that as a column to that block of records as shown below.

 ----------- ---------- ---------- -------- -------- -------- ---------- 
|c1         |      c2  | c3       |    c4  |    c5  |    c6  | c7       |
 ----------- ---------- ---------- -------- -------- -------- ---------- 
| 01        |  B01     |null      |null    |file1   |B01-01  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-02  |2021-12-07|
| 06        |  B01     |foo       |bar     |file1   |B01-03  |2021-12-07|
| 09        |  B01     |2021-12-07|null    |file1   |B01-04  |2021-12-07|
| 01        |  B02     |null      |null    |file2   |B02-01  |2021-12-05|
| 09        |  B02     |2021-12-05|null    |file2   |B02-02  |2021-12-05|
| 01        |  B03     |null      |null    |file3   |B03-01  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-02  |2021-12-07|
| 06        |  B03     |foo       |bar     |file3   |B03-03  |2021-12-07|
| 09        |  B03     |2021-12-07|null    |file3   |B03-04  |2021-12-07|
| 01        |  B01     |null      |null    |file4   |B01-01  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-02  |2021-12-06|
| 06        |  B01     |foo       |bar     |file4   |B01-03  |2021-12-06|
| 09        |  B01     |2021-12-06|null    |file4   |B01-04  |2021-12-06|
 ----------- ---------- ---------- -------- -------- -------- ---------- 

I tried the Window functionality to extract the rowsBetween using unboundedPreceding and unboundedFollowing, but couldn't reach anywhere.

CodePudding user response:

You can filter the trailer records from the original df and then rename c3 column as c7. Finally join the original dataframe and the filtered dataframe on the filename column c5.


from pyspark.sql import functions as F

data = [("01", "B01", None, None,"file1", "B01-01"),
("06", "B01", "foo", "bar" ,"file1", "B01-02"),
("06", "B01", "foo", "bar" ,"file1", "B01-03"),
("09", "B01", "2021-12-07", None,"file1", "B01-04"),
("01", "B02", None, None,"file2", "B02-01"),
("09", "B02", "2021-12-05", None,"file2", "B02-02"),
("01", "B03", None, None,"file3", "B03-01"),
("06", "B03", "foo", "bar" ,"file3", "B03-02"),
("06", "B03", "foo", "bar" ,"file3", "B03-03"),
("09", "B03", "2021-12-07", None,"file3", "B03-04"),
("01", "B01", None, None,"file4", "B01-01"),
("06", "B01", "foo", "bar" ,"file4", "B01-02"),
("06", "B01", "foo", "bar" ,"file4", "B01-03"),
("09", "B01", "2021-12-06", None,"file4", "B01-04"),]

df = spark.createDataFrame(data, ("c1", "c2", "c3", "c4", "c5", "c6", )) 

df_trailer = df.selectExpr("c5", "c3 as c7").filter(F.col("c1") == "09")

df.join(df_trailer, ["c5"]).show()

Output

 ----- --- --- ---------- ---- ------ ---------- 
|   c5| c1| c2|        c3|  c4|    c6|        c7|
 ----- --- --- ---------- ---- ------ ---------- 
|file1| 01|B01|      null|null|B01-01|2021-12-07|
|file1| 06|B01|       foo| bar|B01-02|2021-12-07|
|file1| 06|B01|       foo| bar|B01-03|2021-12-07|
|file1| 09|B01|2021-12-07|null|B01-04|2021-12-07|
|file2| 01|B02|      null|null|B02-01|2021-12-05|
|file2| 09|B02|2021-12-05|null|B02-02|2021-12-05|
|file3| 01|B03|      null|null|B03-01|2021-12-07|
|file3| 06|B03|       foo| bar|B03-02|2021-12-07|
|file3| 06|B03|       foo| bar|B03-03|2021-12-07|
|file3| 09|B03|2021-12-07|null|B03-04|2021-12-07|
|file4| 01|B01|      null|null|B01-01|2021-12-06|
|file4| 06|B01|       foo| bar|B01-02|2021-12-06|
|file4| 06|B01|       foo| bar|B01-03|2021-12-06|
|file4| 09|B01|2021-12-06|null|B01-04|2021-12-06|
 ----- --- --- ---------- ---- ------ ---------- 

CodePudding user response:

I was able to solve this using Window

from pyspark.sql.function import functions as sf
from pyspark.sql.window import Window

w = Window.partitionBy('c5').orderBy('c1').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
new_df = df.withColumn('c7', sf.last('c3').over(w))

Created group based of c5 and then picked the last value of c3. Added that as a new column c7

  • Related