Home > Blockchain >  Better way to find all path in PySpark
Better way to find all path in PySpark

Time:10-19

I have a list of connection like the following:

Predecessor Successor
A B
A C
B D
D E
C F
I J
J I
J K

And I try to create of each root node all the possible path like the following:

Root Successor
A [A, B, D, E]
A [A, C, F]
I [I, J, K]

So I came up with the following code:

# Creating list of root (no predecessor) and tail (no successor) nodes: 
root_nodes = df.select(F.col("Predecessor").join(df.select(F.col("Successor")), [ F.col("Predecessor") == F.col("Successor")], 'leftanti').dropDuplicates()

end_nodes = df.select(F.col("Successor").join(df.select(F.col("Predecessor")), [ F.col("Predecessor") == F.col("Successor")], 'leftanti').dropDuplicates()


# We start from all the root node, who all have at least 1 successor
Result = root_nodes

# Adding an array column to list all the nodes in this path 
Result = Result.withColumn("All_nodes", F.array(F.col("Material")))

# Add the first successor element to create the structure of the dataframe:
Result = Result.join(
    df.select(['Predecessor', 'Successor']).dropDuplicates(),
    [F.col("Root") == F.col("Predecessor")],
    'left'
    ).drop('Predecessor')
Result = Result.withColumnRenamed('Successor', 'iter_0')
Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col("iter_0"))))

Result = Result.join(
    df.select(['Predecessor', 'Successor']).dropDuplicates(),
    [
        F.col("iter_0") == F.col("Predecessor"),
        F.col("Root") != F.col("material_1"),
        ~F.array_contains(F.col("All_nodes"), F.col("Successor"))
    ],
    'left'
    ).drop('Predecessor')
Result = Result.withColumnRenamed('Successor', 'iter_1')
Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col("iter_1"))))

# loop until all successor don't have any successor anymore
have_succ = True
i = 0
while have_succ:
    # Create all name for this iteration --> iter_N
    anti_pred = "iter_"   str(i)
    pred = "iter_"   str(i   1)
    succ = "iter_"   str(i   2)
    Result = Result.join(
        df.select(['Predecessor', 'Successor']).dropDuplicates(),
        [
            F.col(pred) == F.col("Predecessor"),
            F.col(anti_pred) != F.col("Predecessor"),
            ~F.array_contains(F.col("All_nodes"), F.col("Successor"))
        ],
        'left'
        ).drop('Predecessor')
    Result = Result.withColumnRenamed('Successor', succ)
    Result = Result.withColumn("All_nodes", F.array_union(F.col("All_nodes"), F.array(F.col(succ))))

    # increment i
    i  = 1

    # Check if last column if full of null value (no more successor) to stop the loop
    if Result.select(F.col(succ)).filter(~F.col(succ).isNull()).distinct().count() == 0:
        print("break")
        have_succ = False

    # Add an escape condition in case of infinite loop
    elif i > 300:
        have_succ = False

    else:
        print("Continue", i, " -- > ", (i 1))

return Result

Which give me a result which look like that:

Root all_nodes iter_0 iter_1 iter_2 iter_3
A [A, B, D, E] B D E null
A [A, C, F] C F null null
I [I, J, K] J K null null

Which is what I'm looking for but I think there is a better way to do it as when I execute it on larger dataframe it will performe more than 100 join this way

Have you any way to improve this ?

CodePudding user response:

There are specific libraries to work with graphs (the kind of data representation that seems you are working with). Please take a look to Graphframes shortest-path algorithm This library works on top of spark.

  • Related