I have a list of connection like the following:
Predecessor | Successor |
---|---|
A | B |
A | C |
B | D |
D | E |
C | F |
I | J |
J | I |
J | K |
And I try to create of each root node all the possible path like the following:
Root | Successor |
---|---|
A | [A, B, D, E] |
A | [A, C, F] |
I | [I, J, K] |
So I came up with the following code:
# Creating list of root (no predecessor) and tail (no successor) nodes:
root_nodes = df.select(F.col("Predecessor").join(df.select(F.col("Successor")), [ F.col("Predecessor") == F.col("Successor")], 'leftanti').dropDuplicates()
end_nodes = df.select(F.col("Successor").join(df.select(F.col("Predecessor")), [ F.col("Predecessor") == F.col("Successor")], 'leftanti').dropDuplicates()
# We start from all the root node, who all have at least 1 successor
Result = root_nodes
# Adding an array column to list all the nodes in this path
Result = Result.withColumn("All_nodes", F.array(F.col("Material")))
# Add the first successor element to create the structure of the dataframe:
Result = Result.join(
df.select(['Predecessor', 'Successor']).dropDuplicates(),
[F.col("Root") == F.col("Predecessor")],
'left'
).drop('Predecessor')
Result = Result.withColumnRenamed('Successor', 'iter_0')
Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col("iter_0"))))
Result = Result.join(
df.select(['Predecessor', 'Successor']).dropDuplicates(),
[
F.col("iter_0") == F.col("Predecessor"),
F.col("Root") != F.col("material_1"),
~F.array_contains(F.col("All_nodes"), F.col("Successor"))
],
'left'
).drop('Predecessor')
Result = Result.withColumnRenamed('Successor', 'iter_1')
Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col("iter_1"))))
# loop until all successor don't have any successor anymore
have_succ = True
i = 0
while have_succ:
# Create all name for this iteration --> iter_N
anti_pred = "iter_" str(i)
pred = "iter_" str(i 1)
succ = "iter_" str(i 2)
Result = Result.join(
df.select(['Predecessor', 'Successor']).dropDuplicates(),
[
F.col(pred) == F.col("Predecessor"),
F.col(anti_pred) != F.col("Predecessor"),
~F.array_contains(F.col("All_nodes"), F.col("Successor"))
],
'left'
).drop('Predecessor')
Result = Result.withColumnRenamed('Successor', succ)
Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col(succ))))
# increment i
i = 1
# Check if last column if full of null value (no more successor) to stop the loop
if Result.select(F.col(succ)).filter(~F.col(succ).isNull()).distinct().count() == 0:
print("break")
have_succ = False
# Add an escape condition in case of infinite loop
elif i > 300:
have_succ = False
else:
print("Continue", i, " -- > ", (i 1))
return Result
Which give me a result which look like that:
Root | all_nodes | iter_0 | iter_1 | iter_2 | iter_3 |
---|---|---|---|---|---|
A | [A, B, D, E] |
B | D | E | null |
A | [A, C, F] |
C | F | null |
null |
I | [I, J, K] |
J | K | null |
null |
Which is what I'm looking for but I think there is a better way to do it as when I execute it on larger dataframe it will performe more than 100 join this way
Have you any way to improve this ?
CodePudding user response:
There are specific libraries to work with graphs (the kind of data representation that seems you are working with). Please take a look to Graphframes shortest-path algorithm This library works on top of spark
.