I'm working with a fairly big dataframe (around 100 thousand rows, with the intent to reach 10 Mil) and it has the following structure:
------ -------------------- -------- -------------------- -------------------
|LineId| Content| EventId| EventTemplate| Timestamp|
------ -------------------- -------- -------------------- -------------------
| 1|Receiving block b...|ef6f4915|Receiving block <...|2009-11-08 20:35:18|
| 2|BLOCK* NameSystem...|9bc09482|BLOCK* NameSystem...|2009-11-08 20:35:18|
| 3|Receiving block b...|9ca53bce|Receiving block <...|2009-11-08 20:35:19|
------ -------------------- -------- -------------------- -------------------
I'd like to add a label and I'm using the following function to do so:
from functools import reduce
label_condition = reduce(lambda a, b: a|b, (df['Content'].like('%' pat "%") for pat in blocks))
where blocks
is a list containing the block (let's call it a token) defining wether or not a a row is anomalous. This functions checks if the Content
field contains any value of the blocks
list.
The size of this list is around 17k, which is what I think is causing the problem.
When I try to add this to the dataframe, or simply or evaluate this operation I get the following error:
Py4JJavaError: An error occurred while calling o50581.toString.
: java.lang.StackOverflowError
at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
...
Looking online I saw that it might be a problem with the execution of an overly complex plan from Spark and/or to use checkpoint to avoid this sort of thing, but I'm not sure how to go about it. I tried adding a checkpoint before evaluating this, and I also tried using a select to reduce the df to just the 'Content' column, but to no avail.
I found this solution in Scala to optimize the reduce function, but I don't know how to translate it for python.
Is there a way to optimize this or make it at least go step by step or iteratively to avoid the stack overflow?
Thanks in advance.
CodePudding user response:
you could try using rlike
method which accepts regex - pass the regex pattern as 'element1|element2|...'
.
data_sdf. \
withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks))). \
show()
# --- --------------- -----
# | id| log|label|
# --- --------------- -----
# | 1|Test logX blk_A| true|
# | 2|Test logV blk_B|false|
# | 3|Test logF blk_D| true|
# | 4|Test logD blk_F|false|
# | 5|Test logB blk_K|false|
# | 6|Test logY blk_A| true|
# | 7|Test logE blk_C| true|
# --- --------------- -----