I am reading a legacy file into a Dataframe and it looks something like below;
----------- ---------- ---------- -------- -------- --------
|c1 | c2 | c3 | c4 | c5 | c6 |
----------- ---------- ---------- -------- -------- --------
| 01 | B01 |null |null |file1 |B01-01 |
| 06 | B01 |foo |bar |file1 |B01-02 |
| 06 | B01 |foo |bar |file1 |B01-03 |
| 09 | B01 |2021-12-07|null |file1 |B01-04 |
| 01 | B02 |null |null |file2 |B02-01 |
| 09 | B02 |2021-12-07|null |file2 |B02-02 |
| 01 | B03 |null |null |file3 |B03-01 |
| 06 | B03 |foo |bar |file3 |B03-02 |
| 06 | B03 |foo |bar |file3 |B03-03 |
| 09 | B03 |2021-12-07|null |file3 |B03-04 |
| 01 | B01 |null |null |file4 |B01-01 |
| 06 | B01 |foo |bar |file4 |B01-02 |
| 06 | B01 |foo |bar |file4 |B01-03 |
| 09 | B01 |2021-12-06|null |file4 |B01-04 |
----------- ---------- ---------- -------- -------- --------
One physical file contains multiple logical files in it and has a header (01), detail_rec (06) and trailer (09) (sometimes only header and trailer)
I want to take the date from the trailer for every logical separation and add that as a column to that block of records as shown below.
----------- ---------- ---------- -------- -------- -------- ----------
|c1 | c2 | c3 | c4 | c5 | c6 | c7 |
----------- ---------- ---------- -------- -------- -------- ----------
| 01 | B01 |null |null |file1 |B01-01 |2021-12-07|
| 06 | B01 |foo |bar |file1 |B01-02 |2021-12-07|
| 06 | B01 |foo |bar |file1 |B01-03 |2021-12-07|
| 09 | B01 |2021-12-07|null |file1 |B01-04 |2021-12-07|
| 01 | B02 |null |null |file2 |B02-01 |2021-12-05|
| 09 | B02 |2021-12-05|null |file2 |B02-02 |2021-12-05|
| 01 | B03 |null |null |file3 |B03-01 |2021-12-07|
| 06 | B03 |foo |bar |file3 |B03-02 |2021-12-07|
| 06 | B03 |foo |bar |file3 |B03-03 |2021-12-07|
| 09 | B03 |2021-12-07|null |file3 |B03-04 |2021-12-07|
| 01 | B01 |null |null |file4 |B01-01 |2021-12-06|
| 06 | B01 |foo |bar |file4 |B01-02 |2021-12-06|
| 06 | B01 |foo |bar |file4 |B01-03 |2021-12-06|
| 09 | B01 |2021-12-06|null |file4 |B01-04 |2021-12-06|
----------- ---------- ---------- -------- -------- -------- ----------
I tried the Window functionality to extract the rowsBetween using unboundedPreceding and unboundedFollowing, but couldn't reach anywhere.
CodePudding user response:
You can filter the trailer
records from the original df and then rename c3
column as c7
. Finally join the original dataframe and the filtered dataframe on the filename column c5
.
from pyspark.sql import functions as F
data = [("01", "B01", None, None,"file1", "B01-01"),
("06", "B01", "foo", "bar" ,"file1", "B01-02"),
("06", "B01", "foo", "bar" ,"file1", "B01-03"),
("09", "B01", "2021-12-07", None,"file1", "B01-04"),
("01", "B02", None, None,"file2", "B02-01"),
("09", "B02", "2021-12-05", None,"file2", "B02-02"),
("01", "B03", None, None,"file3", "B03-01"),
("06", "B03", "foo", "bar" ,"file3", "B03-02"),
("06", "B03", "foo", "bar" ,"file3", "B03-03"),
("09", "B03", "2021-12-07", None,"file3", "B03-04"),
("01", "B01", None, None,"file4", "B01-01"),
("06", "B01", "foo", "bar" ,"file4", "B01-02"),
("06", "B01", "foo", "bar" ,"file4", "B01-03"),
("09", "B01", "2021-12-06", None,"file4", "B01-04"),]
df = spark.createDataFrame(data, ("c1", "c2", "c3", "c4", "c5", "c6", ))
df_trailer = df.selectExpr("c5", "c3 as c7").filter(F.col("c1") == "09")
df.join(df_trailer, ["c5"]).show()
Output
----- --- --- ---------- ---- ------ ----------
| c5| c1| c2| c3| c4| c6| c7|
----- --- --- ---------- ---- ------ ----------
|file1| 01|B01| null|null|B01-01|2021-12-07|
|file1| 06|B01| foo| bar|B01-02|2021-12-07|
|file1| 06|B01| foo| bar|B01-03|2021-12-07|
|file1| 09|B01|2021-12-07|null|B01-04|2021-12-07|
|file2| 01|B02| null|null|B02-01|2021-12-05|
|file2| 09|B02|2021-12-05|null|B02-02|2021-12-05|
|file3| 01|B03| null|null|B03-01|2021-12-07|
|file3| 06|B03| foo| bar|B03-02|2021-12-07|
|file3| 06|B03| foo| bar|B03-03|2021-12-07|
|file3| 09|B03|2021-12-07|null|B03-04|2021-12-07|
|file4| 01|B01| null|null|B01-01|2021-12-06|
|file4| 06|B01| foo| bar|B01-02|2021-12-06|
|file4| 06|B01| foo| bar|B01-03|2021-12-06|
|file4| 09|B01|2021-12-06|null|B01-04|2021-12-06|
----- --- --- ---------- ---- ------ ----------
CodePudding user response:
I was able to solve this using Window
from pyspark.sql.function import functions as sf
from pyspark.sql.window import Window
w = Window.partitionBy('c5').orderBy('c1').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
new_df = df.withColumn('c7', sf.last('c3').over(w))
Created group based of c5 and then picked the last value of c3. Added that as a new column c7