Given the example dataframe:
--- ---------------
| id| log|
--- ---------------
| 1|Test logX blk_A|
| 2|Test logV blk_B|
| 3|Test logF blk_D|
| 4|Test logD blk_F|
| 5|Test logB blk_K|
| 6|Test logY blk_A|
| 7|Test logE blk_C|
--- ---------------
I'm trying to label it by comparing the log with a list (or df column, I can convert it easily) of the blocks tagges as anomalous. This means that I need to scan each logLine against this list and add the label column.
Given the list:
anomalous_blocks = ['blk_A','blk_C','blk_D']
The expected resulting dataframe would be:
--- --------------- -----
| id| log|Label|
--- --------------- -----
| 1|Test logX blk_A| True|
| 2|Test logV blk_B|False|
| 3|Test logF blk_D| True|
| 4|Test logD blk_F|False|
| 5|Test logB blk_K|False|
| 6|Test logY blk_A| True|
| 7|Test logE blk_C| True|
--- --------------- -----
I tried to think and look for a solution in SQL or Spark that could accomplish this, but came up short.
I thought of using a udf (user defined function) like this:
from pyspark.sql.functions import udf
def check_anomaly(text, anomalies):
for a in anomalies:
if a in text:
return True
return False
anomaly_matchUDF = udf(lambda x,y:check_anomaly(x,y))
But it takes way too long and doesn't seem the proper way to go about this.
Any suggestion would be greatly appreciated.
EDIT:
For clarity, the size of the list is way smaller compared to the number of rows/logs.
In other words, given N log lines and a list of M blocks tagged as anomalous
N >> M
EDIT2:
Updated df to represent more accurately the real situation
CodePudding user response:
you could use the like
or contains
operator and create a chain of condition using reduce
.
anomalous_blocks = ['blk_A','blk_C','blk_D']
label_condition = reduce(lambda a, b: a | b,
[func.col('log').like('%' k '%') for k in anomalous_blocks]
)
# Column<'((log LIKE %blk_A% OR log LIKE %blk_C%) OR log LIKE %blk_D%)'>
data_sdf. \
withColumn('label', label_condition). \
show()
# --- --------------- -----
# | id| log|label|
# --- --------------- -----
# | 1|Test logX blk_A| true|
# | 2|Test logV blk_B|false|
# | 3|Test logF blk_D| true|
# | 4|Test logD blk_F|false|
# | 5|Test logB blk_K|false|
# | 6|Test logY blk_A| true|
# | 7|Test logE blk_C| true|
# --- --------------- -----
CodePudding user response:
You can use the isin
method on a pyspark.sql.Column
to achieve this without needing UDFs (notice that I changed the contents of your anomalous_blocks
list slightly to match exactly to the df's contents. This should be really cheap since you said N >> M
):
df = spark.createDataFrame(
[
(1, "Test log blk_A"),
(2, "Test log blk_B"),
(3, "Test log blk_D"),
(4, "Test log blk_F"),
(5, "Test log blk_K"),
(6, "Test log blk_A"),
(7, "Test log blk_C")
],
["id", "log"]
)
anomalous_blocks = ['blk_A','blk_C','blk_D']
# Solution starts here
adapted_anomalous_blocks = ["Test log " x for x in anomalous_blocks]
output = df.withColumn("Label", df.log.isin(adapted_anomalous_blocks))
output.show()
--- -------------- -----
| id| log|Label|
--- -------------- -----
| 1|Test log blk_A| true|
| 2|Test log blk_B|false|
| 3|Test log blk_D| true|
| 4|Test log blk_F|false|
| 5|Test log blk_K|false|
| 6|Test log blk_A| true|
| 7|Test log blk_C| true|
--- -------------- -----