Home > OS >  Pyspark reduce function causes StackOverflowError
Pyspark reduce function causes StackOverflowError

Time:12-31

I'm working with a fairly big dataframe (around 100 thousand rows, with the intent to reach 10 Mil) and it has the following structure:

 ------ -------------------- -------- -------------------- ------------------- 
|LineId|             Content| EventId|       EventTemplate|          Timestamp|
 ------ -------------------- -------- -------------------- ------------------- 
|     1|Receiving block b...|ef6f4915|Receiving block <...|2009-11-08 20:35:18|
|     2|BLOCK* NameSystem...|9bc09482|BLOCK* NameSystem...|2009-11-08 20:35:18|
|     3|Receiving block b...|9ca53bce|Receiving block <...|2009-11-08 20:35:19|
 ------ -------------------- -------- -------------------- ------------------- 

I'd like to add a label and I'm using the following function to do so:

from functools import reduce
label_condition = reduce(lambda a, b: a|b, (df['Content'].like('%' pat "%") for pat in blocks))

where blocks is a list containing the block (let's call it a token) defining wether or not a a row is anomalous. This functions checks if the Content field contains any value of the blocks list.
The size of this list is around 17k, which is what I think is causing the problem.

When I try to add this to the dataframe, or simply or evaluate this operation I get the following error:

Py4JJavaError: An error occurred while calling o50581.toString.
: java.lang.StackOverflowError
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
...

Looking online I saw that it might be a problem with the execution of an overly complex plan from Spark and/or to use checkpoint to avoid this sort of thing, but I'm not sure how to go about it. I tried adding a checkpoint before evaluating this, and I also tried using a select to reduce the df to just the 'Content' column, but to no avail.

I found this solution in Scala to optimize the reduce function, but I don't know how to translate it for python.

Is there a way to optimize this or make it at least go step by step or iteratively to avoid the stack overflow?

Thanks in advance.

CodePudding user response:

you could try using rlike method which accepts regex - pass the regex pattern as 'element1|element2|...'.

data_sdf. \
    withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks))). \
    show()

#  --- --------------- ----- 
# | id|            log|label|
#  --- --------------- ----- 
# |  1|Test logX blk_A| true|
# |  2|Test logV blk_B|false|
# |  3|Test logF blk_D| true|
# |  4|Test logD blk_F|false|
# |  5|Test logB blk_K|false|
# |  6|Test logY blk_A| true|
# |  7|Test logE blk_C| true|
#  --- --------------- ----- 
  • Related