Home > database >  Pyspark: join dataframe as an array type column to another dataframe
Pyspark: join dataframe as an array type column to another dataframe

Time:11-09

I'm trying to join two dataframes in pyspark but join one table as an array column on another.

For example, for these tables:

from pyspark.sql import Row

df1 = spark.createDataFrame([
    Row(a = 1, b = 'C', c = 26, d = 'abc'),
    Row(a = 1, b = 'C', c = 27, d = 'def'),
    Row(a = 1, b = 'D', c = 51, d = 'ghi'),
    Row(a = 2, b = 'C', c = 40, d = 'abc'),
    Row(a = 2, b = 'D', c = 45, d = 'abc'),
    Row(a = 2, b = 'D', c = 38, d = 'def')
])

df2 = spark.createDataFrame([
    Row(a = 1, b = 'C', e = 2, f = 'cba'),
    Row(a = 1, b = 'D', e = 3, f = 'ihg'),
    Row(a = 2, b = 'C', e = 7, f = 'cba'),
    Row(a = 2, b = 'D', e = 9, f = 'cba')
])

I want to join df1 to df2 on columns a and b but df1.c and df1.d should be a single array type column. Also, all names should be retained. The output of the new dataframe should be able to be translated into this json structure (example for first two rows):

{
    "a": 1,
    "b": "C",
    "e": 2,
    "f": "cba",
    "df1": [
            {
                "c": 26,
                "d": "abc"
            },
            {
                "c": 27,
                "d": "def"
            } 
           ]
}

Any ideas on how to accomplish this would be very much appreciated!

Thanks,

Carolina

CodePudding user response:

Based on your input sample data :

Aggregation on df1

from pyspark.sql import functions as F


df1 = df1.groupBy("a", "b").agg(
    F.collect_list(F.struct(F.col("c"), F.col("d"))).alias("df1")
)

df1.show()
 --- --- -------------------- 
|  a|  b|                 df1|
 --- --- -------------------- 
|  1|  C|[[26, abc], [27, ...|
|  1|  D|         [[51, ghi]]|
|  2|  D|[[45, abc], [38, ...|
|  2|  C|         [[40, abc]]|
 --- --- -------------------- 

join with df2

df3 = df1.join(df2, on=["a", "b"])

df3.show()
 --- --- -------------------- --- --- 
|  a|  b|                 df1|  e|  f|
 --- --- -------------------- --- --- 
|  1|  C|[[26, abc], [27, ...|  2|cba|
|  1|  D|         [[51, ghi]]|  3|ihg|
|  2|  D|[[45, abc], [38, ...|  9|cba|
|  2|  C|         [[40, abc]]|  7|cba|
 --- --- -------------------- --- --- 
  • Related