Home > Blockchain >  How do I filter the column in pyspark?
How do I filter the column in pyspark?

Time:08-20

I am new to pyspark. I want to compare two tables. If the the value in one of the column does not match, I want to print out that column name in a new column. Using, Compare two dataframes Pyspark link, I am able to get that result. Now, I want to filter the new table based on the newly created column.

df1 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "UK"],
  [3, "GHI", 3000, "JPN"],
  [4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])

df2 = spark.createDataFrame([
  [1, "ABC", 5000, "US"],
  [2, "DEF", 4000, "CAN"],
  [3, "GHI", 3500, "JPN"],
  [4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])


from pyspark.sql.functions import *
#from pyspark.sql.functions import col, array, when, array_remove

# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']

select_expr =[
                col("id"), 
                *[df2[c] for c in df2.columns if c != 'id'], 
                array_remove(array(*conditions_), "").alias("column_names")
]

df3 = df1.join(df2, "id").select(*select_expr)
df3.show()

DF3:

 ------ --------- -------- ------ -------------- 
|   id | |name  | sal  | Address | column_names |
 ------ --------- -------- ------ -------------- 
|     1|  ABC   | 5000 | US      |  []          |
|     2|  DEF   | 4000 | CAN     |  [address]   |
|     3|  GHI   | 3500 | JPN     |  [sal]       |
|     4|  JKL_M | 4800 | CHN     |  [name,sal]  |
 ------ --------- -------- ------ -------------- 

This is the step where I am getting an error message.

df3.filter(df3.column_names!="")

Error: cannot resolve '(column_names = '')' due to data type mismatch: differing types in '(column_names = '')' (array<string> and string).

I want the following result

DF3:

 ------ --------- -------- ------ -------------- 
|   id | |name  | sal  | Address | column_names |
 ------ --------- -------- ------ --------------       
|     1|  DEF   | 4000 | CAN     |  [address]   |
|     2|  GHI   | 3500 | JPN     |  [sal]       |
|     3|  JKL_M | 4800 | CHN     |  [name,sal]  |
 ------ --------- -------- ------ -------------- 

CodePudding user response:

You can create a udf to filter and pass the relevant column name to it, I hope below code will help.

from pyspark.sql import functions

simple filter function
@udf(returnType=BooleanType())
def my_filter(col1):
  return True if len(col1) > 0 else False

df3.filter(my_filter(col('column_names'))).show()

CodePudding user response:

Another way

#Do an outer join

new = df1.join(df2.alias('df2'), how='outer', on=['id','name','sal','Address'])

#Count disntict values in in each column per id

new1 =new.groupBy('id').agg(*[countDistinct(x).alias(f'{x}') for x in new.drop('id').columns])

#Using case when, where there is more than one distinct value, append column to new column
new2 = new1.select('id',array_except(array((*[when(col(c) != 1, lit(c)) for c in new1.drop('id').columns])),array(lit(None).cast('string'))).alias('column_names'))

#Join back to df2
df2.join(new2,how='right', on='id').show()


 --- ----- ---- ------- ------------ 
| id| name| sal|Address|column_names|
 --- ----- ---- ------- ------------ 
|  1|  ABC|5000|     US|          []|
|  2|  DEF|4000|    CAN|   [Address]|
|  3|  GHI|3500|    JPN|       [sal]|
|  4|JKL_M|4800|    CHN| [name, sal]|
 --- ----- ---- ------- ------------ 

CodePudding user response:

you are getting error because you are comparing array type to string, you should first convert column_names array type to string then it will work

df3 = df3.withColumn('column_names',concat_ws(";",col("column_names")))

CodePudding user response:

Use filter('array_column != array()'). See below example that filters out the empty arrays.

spark.sparkContext.parallelize([([],), (['blah', 'bleh'],)]).toDF(['arrcol']). \
    show()

#  ------------ 
# |      arrcol|
#  ------------ 
# |          []|
# |[blah, bleh]|
#  ------------ 

spark.sparkContext.parallelize([([],), (['blah', 'bleh'],)]).toDF(['arrcol']). \
    filter('arrcol != array()'). \
    show()

#  ------------ 
# |      arrcol|
#  ------------ 
# |[blah, bleh]|
#  ------------ 
  • Related