Home > Software design >  Pyspark scan column against other column or list
Pyspark scan column against other column or list

Time:12-30

Given the example dataframe:

 --- --------------- 
| id|            log|
 --- --------------- 
|  1|Test logX blk_A|
|  2|Test logV blk_B|
|  3|Test logF blk_D|
|  4|Test logD blk_F|
|  5|Test logB blk_K|
|  6|Test logY blk_A|
|  7|Test logE blk_C|
 --- --------------- 

I'm trying to label it by comparing the log with a list (or df column, I can convert it easily) of the blocks tagges as anomalous. This means that I need to scan each logLine against this list and add the label column.

Given the list:

anomalous_blocks = ['blk_A','blk_C','blk_D']

The expected resulting dataframe would be:

 --- --------------- ----- 
| id|            log|Label|
 --- --------------- ----- 
|  1|Test logX blk_A| True|
|  2|Test logV blk_B|False|
|  3|Test logF blk_D| True|
|  4|Test logD blk_F|False|
|  5|Test logB blk_K|False|
|  6|Test logY blk_A| True|
|  7|Test logE blk_C| True|
 --- --------------- ----- 

I tried to think and look for a solution in SQL or Spark that could accomplish this, but came up short.
I thought of using a udf (user defined function) like this:

from pyspark.sql.functions import udf 
def check_anomaly(text, anomalies):
  for a in anomalies:
    if a in text:
      return True
  return False

anomaly_matchUDF = udf(lambda x,y:check_anomaly(x,y))   

But it takes way too long and doesn't seem the proper way to go about this.

Any suggestion would be greatly appreciated.

EDIT:
For clarity, the size of the list is way smaller compared to the number of rows/logs. In other words, given N log lines and a list of M blocks tagged as anomalous

N >> M

EDIT2:
Updated df to represent more accurately the real situation

CodePudding user response:

you could use the like or contains operator and create a chain of condition using reduce.

anomalous_blocks = ['blk_A','blk_C','blk_D']
label_condition = reduce(lambda a, b: a | b, 
                         [func.col('log').like('%' k '%') for k in anomalous_blocks]
                         )
# Column<'((log LIKE %blk_A% OR log LIKE %blk_C%) OR log LIKE %blk_D%)'>

data_sdf. \
    withColumn('label', label_condition). \
    show()

#  --- --------------- ----- 
# | id|            log|label|
#  --- --------------- ----- 
# |  1|Test logX blk_A| true|
# |  2|Test logV blk_B|false|
# |  3|Test logF blk_D| true|
# |  4|Test logD blk_F|false|
# |  5|Test logB blk_K|false|
# |  6|Test logY blk_A| true|
# |  7|Test logE blk_C| true|
#  --- --------------- ----- 

CodePudding user response:

You can use the isin method on a pyspark.sql.Column to achieve this without needing UDFs (notice that I changed the contents of your anomalous_blocks list slightly to match exactly to the df's contents. This should be really cheap since you said N >> M):

df = spark.createDataFrame(
    [
        (1, "Test log blk_A"),
        (2, "Test log blk_B"),
        (3, "Test log blk_D"),
        (4, "Test log blk_F"),
        (5, "Test log blk_K"),
        (6, "Test log blk_A"),
        (7, "Test log blk_C")
    ],
    ["id", "log"]
)

anomalous_blocks = ['blk_A','blk_C','blk_D']

# Solution starts here
adapted_anomalous_blocks = ["Test log "   x for x in anomalous_blocks]
output = df.withColumn("Label", df.log.isin(adapted_anomalous_blocks))
output.show()

 --- -------------- ----- 
| id|           log|Label|
 --- -------------- ----- 
|  1|Test log blk_A| true|
|  2|Test log blk_B|false|
|  3|Test log blk_D| true|
|  4|Test log blk_F|false|
|  5|Test log blk_K|false|
|  6|Test log blk_A| true|
|  7|Test log blk_C| true|
 --- -------------- ----- 
  • Related