Home > Blockchain >  Longest consecutive 1 from consecutive colums in Pyspark dataframe
Longest consecutive 1 from consecutive colums in Pyspark dataframe

Time:01-18

Suppose I have a Py spark data frame as follows:

b1  b2  b3  b4  b5  b6
1   1   1   0   1   1

test_df = spark.createDataFrame([
(1,1,1,0,1,1)
], ("b1", "b2","b3","b4","b5","b6"))

Here, the length of the longest consecutive 1 is 3. I have a large dataset of 1M rows. For each row, I want to calculate like this.

So, my initial idea was to make a new column that concatenates each of the value of the column. So, I follow this way. At first, I concatenated all of the column values to a new value:

test_df = test_df.withColumn('Ind', F.concat(*cols))

I got the dataframe like this:

b1  b2  b3  b4  b5  b6  ind
1   1   1   0   1   1   '111011'

Then I create a seperate UDF:

def findMaxConsecutiveOnes(X) -> int:
nums = [int(j) for a,j in enumerate(X)]
count = 0
maxCount = 0

for idx, num in enumerate(nums):
    if num == 1:
        count  = 1
    if num == 0 or idx == len(nums) - 1:
        maxCount = max(maxCount, count)
        count = 0

return maxCount

then created a UDF:

maxcon_udf = udf(lambda x: findMaxConsecutiveOnes(x))  

and finally,

test_df = test_df.withColumn('final', maxcon_udf('ind'))

However, this shows error. Can someone please help me to solve this problem?

CodePudding user response:

After adding cols=test_df.columns and indenting the findMaxConsecutiveOnes function accordingly your code works.

Yet, I would recommend avoiding UDFs in pyspark when you can because executing python code is quite expensive in spark.

You could solve your problem with spark functions by:

  1. splitting ind over anything that is not a one
  2. Computing the length of each resulting substring
  3. getting the max of that.
import pyspark.sql.functions as F
cols = test_df.columns

test_df\
    .withColumn('Ind', F.concat(*cols))\
    .withColumn("final", F.array_max(F.transform(F.split("ind", "[^1] "), F.length)))\
    .show()
 --- --- --- --- --- --- ------ ----- 
| b1| b2| b3| b4| b5| b6|   Ind|final|
 --- --- --- --- --- --- ------ ----- 
|  1|  1|  1|  0|  1|  1|111011|    3|
 --- --- --- --- --- --- ------ ----- 

CodePudding user response:

You code works just fine maybe you forgot to replace cols by test_df.columns:

spark = SparkSession.builder.master("local[*]").getOrCreate()
test_df = spark.createDataFrame([
    (1, 1, 1, 0, 1, 1)
], ("b1", "b2", "b3", "b4", "b5", "b6"))
cols = test_df.columns
test_df = test_df.withColumn('Ind', concat(*cols))

def findMaxConsecutiveOnes(X) -> int:
    nums = [int(j) for a, j in enumerate(X)]
    count = 0
    maxCount = 0
    for idx, num in enumerate(nums):
        if num == 1:
            count  = 1
        if num == 0 or idx == len(nums) - 1:
            maxCount = max(maxCount, count)
            count = 0
    return maxCount

maxcon_udf = udf(lambda x: findMaxConsecutiveOnes(x))
test_df.withColumn('final', maxcon_udf('ind')).show()


 --- --- --- --- --- --- ------ ----- 
| b1| b2| b3| b4| b5| b6|   Ind|final|
 --- --- --- --- --- --- ------ ----- 
|  1|  1|  1|  0|  1|  1|111011|    3|
 --- --- --- --- --- --- ------ ----- 
  • Related