Home > Software design >  Pyspark scan column against other column or list
Pyspark scan column against other column or list


Given the example dataframe:

 --- --------------- 
| id|            log|
 --- --------------- 
|  1|Test logX blk_A|
|  2|Test logV blk_B|
|  3|Test logF blk_D|
|  4|Test logD blk_F|
|  5|Test logB blk_K|
|  6|Test logY blk_A|
|  7|Test logE blk_C|
 --- --------------- 

I'm trying to label it by comparing the log with a list (or df column, I can convert it easily) of the blocks tagges as anomalous. This means that I need to scan each logLine against this list and add the label column.

Given the list:

anomalous_blocks = ['blk_A','blk_C','blk_D']

The expected resulting dataframe would be:

 --- --------------- ----- 
| id|            log|Label|
 --- --------------- ----- 
|  1|Test logX blk_A| True|
|  2|Test logV blk_B|False|
|  3|Test logF blk_D| True|
|  4|Test logD blk_F|False|
|  5|Test logB blk_K|False|
|  6|Test logY blk_A| True|
|  7|Test logE blk_C| True|
 --- --------------- ----- 

I tried to think and look for a solution in SQL or Spark that could accomplish this, but came up short.
I thought of using a udf (user defined function) like this:

from pyspark.sql.functions import udf 
def check_anomaly(text, anomalies):
  for a in anomalies:
    if a in text:
      return True
  return False

anomaly_matchUDF = udf(lambda x,y:check_anomaly(x,y))   

But it takes way too long and doesn't seem the proper way to go about this.

Any suggestion would be greatly appreciated.

For clarity, the size of the list is way smaller compared to the number of rows/logs. In other words, given N log lines and a list of M blocks tagged as anomalous

N >> M

Updated df to represent more accurately the real situation

CodePudding user response:

you could use the like or contains operator and create a chain of condition using reduce.

anomalous_blocks = ['blk_A','blk_C','blk_D']
label_condition = reduce(lambda a, b: a | b, 
                         [func.col('log').like('%' k '%') for k in anomalous_blocks]
# Column<'((log LIKE %blk_A% OR log LIKE %blk_C%) OR log LIKE %blk_D%)'>

data_sdf. \
    withColumn('label', label_condition). \

#  --- --------------- ----- 
# | id|            log|label|
#  --- --------------- ----- 
# |  1|Test logX blk_A| true|
# |  2|Test logV blk_B|false|
# |  3|Test logF blk_D| true|
# |  4|Test logD blk_F|false|
# |  5|Test logB blk_K|false|
# |  6|Test logY blk_A| true|
# |  7|Test logE blk_C| true|
#  --- --------------- ----- 

CodePudding user response:

You can use the isin method on a pyspark.sql.Column to achieve this without needing UDFs (notice that I changed the contents of your anomalous_blocks list slightly to match exactly to the df's contents. This should be really cheap since you said N >> M):

df = spark.createDataFrame(
        (1, "Test log blk_A"),
        (2, "Test log blk_B"),
        (3, "Test log blk_D"),
        (4, "Test log blk_F"),
        (5, "Test log blk_K"),
        (6, "Test log blk_A"),
        (7, "Test log blk_C")
    ["id", "log"]

anomalous_blocks = ['blk_A','blk_C','blk_D']

# Solution starts here
adapted_anomalous_blocks = ["Test log "   x for x in anomalous_blocks]
output = df.withColumn("Label", df.log.isin(adapted_anomalous_blocks))

 --- -------------- ----- 
| id|           log|Label|
 --- -------------- ----- 
|  1|Test log blk_A| true|
|  2|Test log blk_B|false|
|  3|Test log blk_D| true|
|  4|Test log blk_F|false|
|  5|Test log blk_K|false|
|  6|Test log blk_A| true|
|  7|Test log blk_C| true|
 --- -------------- ----- 
  • Related