Home > Net >  Compare two arrays from two different dataframes in Pyspark
Compare two arrays from two different dataframes in Pyspark

Time:01-07

I have two dataframes ecah has an array(string) columns.

I am trying to create a new data frame that only filters rows where one of the array element in a row matches with other.

#first dataframe
main_df = spark.createDataFrame([('1', ['YYY', 'MZA']),
    ('2', ['XXX','YYY']),
    ('3',['QQQ']),
     ('4', ['RRR', 'ZZZ', 'BBB1'])],
    ('No', 'refer_array_col'))

#second dataframe
df = spark.createDataFrame([('1A',    '3412asd','value-1',    ['XXX', 'YYY', 'AAA']),
('2B',  '2345tyu','value-2',    ['DDD', 'YFFFYY', 'GGG', '1']),
('3C',  '9800bvd',  'value-3',  ['AAA']),
 ('3C', '9800bvd',  'value-1',  ['AAA', 'YYY', 'CCCC'])],
('ID',  'Company_Id',   'value' ,'array_column'))

df.show()

 --- ---------- ------- -------------------- 
| ID|Company_Id|  value|      array_column  |
 --- ---------- ------- -------------------- 
| 1A|   3412asd|value-1|   [XXX, YYY, AAA]  |
| 2B|   2345tyu|value-2|[DDD, YFFFYY, GGG, 1]|
| 3C|   9800bvd|value-3|             [AAA]   |
| 3C|   9800bvd|value-1|  [AAA, YYY, CCCC]   |
 --- ---------- ------- --------------------- 

Code I tried:

The main idea is to use rdd.toLocalIterator() as there are some other functions inside the same for loop that are depending on this filters

for x in main_df.rdd.toLocalIterator:
    a = main_df["refer_array_col"]
     b = main_df["No"]
     some_x_filter = F.col('array_coulmn').isin(b)  
   

    final_df = df.filter(
       # filter 1
       some_x_filter &           
       # second filter is to compare 'a' with array_column - i tried using F.array_contains
       (F.array_contains(F.col('array_column'), F.lit(a)))     
)
  • some_x_filter is also working in a similar way
  • some_x_filter is comparing a string value in a array of strings column.
  • But now a contains a list of strings and I am unable to compare it with array_column

With my code I am getting an error for array contains

Error

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList ['YYY', 'MZA']

Can anyone tell me what can i use at the second filter alternatively?

CodePudding user response:

From what I understood based on our conversation in the comments.

Essentially your requirement is to compare an array column with a Python List.

Thus, this would do the job

df.withColumn("asArray", F.array(*[F.lit(x) for x in b]))
  • Related