Home > database >  pyspark filter columns values based on a list of list values
pyspark filter columns values based on a list of list values

Time:08-24

I wish to filter the values for multiple columns based on a list of values.

For example, I have a list of values as below:

list_example= [[10,"test", 100],
[20, "test2", 50],
[30, "test5", 100],
[40, "test6", 200],
..]

My dataframe in pyspark is as following: the following is example, it should be a lot.

data = {"Value": [10,50,70,80, 88, 99, 40],
       "String": ["test", "other", "string", "are", "in", "test", "test6"],
       "total": [100,100,300,500,600,111, 200]}

My current method:

for l in list_example:
    df.filter((F.col("Value").isin(l[0]))&(F.col("String").isin(l[1]))&(F.col("total").isin(l[2]))).show()

My issue: I dont think the way I loop each list and filter it is correct, too time consuming.

  1. My list_example list contained more than thousands list of values. so when I len(list_example) is has more than 2000 lists.
  2. My data contained more than millions of data.
  3. When I use for i in list_example and then filter it through the data it will be taking so long.

I am wondering whether is there a way to filter the values based on the set of list values and then when matching is found, combine all the found and show it? or save it in another dataframe?

My condition is as below:

(value in <val> AND string in <str> AND total in <tot>)

So in order to get the filter row, it will need to satisfied <val>,<str>,<tol>. So using AND is my intention. 

CodePudding user response:

You can create a filter condition using map() and reduce() python functions.

filter_cond = reduce(lambda a, b: a | b, 
                     map(lambda x: ((func.col('value') == x[0]) & (func.col('string') == x[1]) & (func.col('total') == x[2])), 
                         ls_for_filter
                         )
                     )

filtered_data_sdf = data_sdf.filter(filter_cond)

The filter_cond variable can be passed within filter(). Using your example below.

data_sdf = spark.createDataFrame(pd.DataFrame(data_ls))

#  ----- ------ ----- 
# |Value|String|total|
#  ----- ------ ----- 
# |   10|  test|  100|
# |   50| other|  100|
# |   70|string|  300|
# |   80|   are|  500|
# |   88|    in|  600|
# |   99|  test|  111|
# |   40| test6|  200|
#  ----- ------ ----- 

data_sdf. \
    filter(filter_cond). \
    show()

#  ----- ------ ----- 
# |Value|String|total|
#  ----- ------ ----- 
# |   10|  test|  100|
# |   40| test6|  200|
#  ----- ------ ----- 
  • Related