Suppose I have a Py spark data frame as follows:
b1 b2 b3 b4 b5 b6
1 1 1 0 1 1
test_df = spark.createDataFrame([
(1,1,1,0,1,1)
], ("b1", "b2","b3","b4","b5","b6"))
Here, the length of the longest consecutive 1 is 3. I have a large dataset of 1M rows. For each row, I want to calculate like this.
So, my initial idea was to make a new column that concatenates each of the value of the column. So, I follow this way. At first, I concatenated all of the column values to a new value:
test_df = test_df.withColumn('Ind', F.concat(*cols))
I got the dataframe like this:
b1 b2 b3 b4 b5 b6 ind
1 1 1 0 1 1 '111011'
Then I create a seperate UDF:
def findMaxConsecutiveOnes(X) -> int:
nums = [int(j) for a,j in enumerate(X)]
count = 0
maxCount = 0
for idx, num in enumerate(nums):
if num == 1:
count = 1
if num == 0 or idx == len(nums) - 1:
maxCount = max(maxCount, count)
count = 0
return maxCount
then created a UDF:
maxcon_udf = udf(lambda x: findMaxConsecutiveOnes(x))
and finally,
test_df = test_df.withColumn('final', maxcon_udf('ind'))
However, this shows error. Can someone please help me to solve this problem?
CodePudding user response:
After adding cols=test_df.columns
and indenting the findMaxConsecutiveOnes
function accordingly your code works.
Yet, I would recommend avoiding UDFs in pyspark when you can because executing python code is quite expensive in spark.
You could solve your problem with spark functions by:
- splitting
ind
over anything that is not a one - Computing the length of each resulting substring
- getting the max of that.
import pyspark.sql.functions as F
cols = test_df.columns
test_df\
.withColumn('Ind', F.concat(*cols))\
.withColumn("final", F.array_max(F.transform(F.split("ind", "[^1] "), F.length)))\
.show()
--- --- --- --- --- --- ------ -----
| b1| b2| b3| b4| b5| b6| Ind|final|
--- --- --- --- --- --- ------ -----
| 1| 1| 1| 0| 1| 1|111011| 3|
--- --- --- --- --- --- ------ -----
CodePudding user response:
You code works just fine maybe you forgot to replace cols by test_df.columns:
spark = SparkSession.builder.master("local[*]").getOrCreate()
test_df = spark.createDataFrame([
(1, 1, 1, 0, 1, 1)
], ("b1", "b2", "b3", "b4", "b5", "b6"))
cols = test_df.columns
test_df = test_df.withColumn('Ind', concat(*cols))
def findMaxConsecutiveOnes(X) -> int:
nums = [int(j) for a, j in enumerate(X)]
count = 0
maxCount = 0
for idx, num in enumerate(nums):
if num == 1:
count = 1
if num == 0 or idx == len(nums) - 1:
maxCount = max(maxCount, count)
count = 0
return maxCount
maxcon_udf = udf(lambda x: findMaxConsecutiveOnes(x))
test_df.withColumn('final', maxcon_udf('ind')).show()
--- --- --- --- --- --- ------ -----
| b1| b2| b3| b4| b5| b6| Ind|final|
--- --- --- --- --- --- ------ -----
| 1| 1| 1| 0| 1| 1|111011| 3|
--- --- --- --- --- --- ------ -----