Home > database >  Loop to iterate join over columns in Pyspark
Loop to iterate join over columns in Pyspark

Time:11-29

I have a dataframe:

data = [('s1', 's2'),
       ('s1', 's3'),
       ('s2', 's4'),
       ('s3', 's5'),
       ('s5', 's6')]
sdf = spark.createDataFrame(data, schema = ['start', 'end'])
sdf.show()
 ----- --- 
|start|end|
 ----- --- 
|   s1| s2|
|   s1| s3|
|   s2| s4|
|   s3| s5|
|   s5| s6|
 ----- --- 

I want to see if the end column has values from the start column and write their values in a new end2 column

new_sdf = sdf
sdf = sdf.join(new_sdf , (sdf.start== new_sdf.end))

The result is something like this:

 ----- --- ---- 
|start|end|end2|
 ----- --- ---- 
|   s1| s2|  s4|
|   s1| s3|  s5|
|   s2| s4|null|
|   s3| s5|  s6|
|   s5| s6|null|
 ----- --- ---- 

Then I want to join again to see if end2 has values in start and write their values from the end column to the new end3 column. And so do the join until the last column is filled with all the None values

That is, an iterative join along the columns comes out (there are actually a lot more rows in my dataframe, so writing each join is not very good). But I don't understand how to do it. The result should be something like this

 ----- --- ---- ---- ---- 
|start|end|end2|end3|end4|
 ----- --- ---- ---- ---- 
|   s1| s2|  s4|None|None|
|   s1| s3|  s5|  s6|None|
|   s2| s4|None|None|None|
|   s3| s5|  s6|None|None|
|   s5| s6|None|None|None|
 ----- --- ---- ---- ---- 

CodePudding user response:

The logic is loop while there exists a non-null "end_i":

  • Self join where start == "end_i"
  • Drop excess columns and rename newly joined "end" to "end_i 1"
data = [('s1', 's2'),
       ('s1', 's3'),
       ('s2', 's4'),
       ('s3', 's5'),
       ('s5', 's6')]
sdf = spark.createDataFrame(data, schema = ['start', 'end'])

i = 0
sdf.show()
while(sdf.filter(F.col(f"end{i if i>0 else ''}").isNotNull()).count() > 0):
  sdf = sdf.alias("s1").join(sdf.alias("s2").select("start", "end"),
                            F.col(f"s1.end{i if i>0 else ''}")==F.col("s2.start"), 
                            how="left") \
           .withColumn(f"end{i 1}", F.col("s2.end")) \
           .drop(F.col("s2.start")) \
           .drop(F.col("s2.end"))
  # end-while
  i  = 1
  sdf.show()

Output:

 ----- --- 
|start|end|
 ----- --- 
|   s1| s2|
|   s1| s3|
|   s2| s4|
|   s3| s5|
|   s5| s6|
 ----- --- 

 ----- --- ---- 
|start|end|end1|
 ----- --- ---- 
|   s5| s6|null|
|   s2| s4|null|
|   s3| s5|  s6|
|   s1| s2|  s4|
|   s1| s3|  s5|
 ----- --- ---- 

 ----- --- ---- ---- 
|start|end|end1|end2|
 ----- --- ---- ---- 
|   s5| s6|null|null|
|   s2| s4|null|null|
|   s3| s5|  s6|null|
|   s1| s2|  s4|null|
|   s1| s3|  s5|  s6|
 ----- --- ---- ---- 

 ----- --- ---- ---- ---- 
|start|end|end1|end2|end3|
 ----- --- ---- ---- ---- 
|   s1| s3|  s5|  s6|null|
|   s5| s6|null|null|null|
|   s2| s4|null|null|null|
|   s3| s5|  s6|null|null|
|   s1| s2|  s4|null|null|
 ----- --- ---- ---- ---- 
  • Related