Home > Back-end >  Merge two spark dataframes using array values
Merge two spark dataframes using array values


I have two Spark dataframes that look as following:

> cities_df

 ---------- --------------------------- 
|   city_id|                     cities|           
 ---------- --------------------------- 
|       22 |[Milan, Turin, Rome]       |
 ---------- --------------------------- 
|       15 |[Naples, Florence, Genoa]  |
 ---------- --------------------------- 
|       43 |[Houston, San Jose, Boston]|
 ---------- --------------------------- 
|       56 |[New York, Dallas, Chicago]|
 ---------- --------------------------- 

> countries_df

 ---------- ---------------------------------- 
|country_id|                         countries|           
 ---------- ---------------------------------- 
|      680 |{'country': [56, 43], 'add': []}  |
 ---------- ---------------------------------- 
|       11 |{'country': [22, 15], 'add': [32]}|
 ---------- ---------------------------------- 

Here, country values from the countries_df are the city ids from the cities_df dataframe.

I need to merge these dataframes to replace the city id for country with their values from the cities_df dataframe.

Expected output:

country_id countries grouped_cities
680 {'country': [56, 43], 'add': []} [New York, Dallas, Chicago, Houston, San Jose, Boston]
11 {'country': [22, 15], 'add': [32]} [Milan, Turin, Rome, Naples, Florence, Genoa]

Obtained grouped_cities value doesn't have to be an array type, it can be just a string.

How can I get this result using PySpark?

CodePudding user response:


from pyspark.sql import functions as F
cities_df = spark.createDataFrame(
    [(22, ['Milan', 'Turin', 'Rome']),
     (15, ['Naples', 'Florence', 'Genoa']),
     (43, ['Houston', 'San Jose', 'Boston']),
     (56, ['New York', 'Dallas', 'Chicago'])],
    ['city_id', 'cities']
countries_df = spark.createDataFrame(
    [(680, {'country': [56, 43], 'add': []}),
     (11, {'country': [22, 15], 'add': [32]})],
    ['country_id', 'countries']


df_expl = countries_df.withColumn('city_id', F.explode(F.col('countries')['country']))
df_joined = df_expl.join(cities_df, 'city_id', 'left')
df = df_joined.groupBy('country_id').agg(
#  ---------- ---------------------------------- ------------------------------------------------------ 
# |country_id|countries                         |grouped_cities                                        |
#  ---------- ---------------------------------- ------------------------------------------------------ 
# |11        |{add -> [32], country -> [22, 15]}|[Naples, Florence, Genoa, Milan, Turin, Rome]         |
# |680       |{add -> [], country -> [56, 43]}  |[Houston, San Jose, Boston, New York, Dallas, Chicago]|
#  ---------- ---------------------------------- ------------------------------------------------------ 

CodePudding user response:

Anaother way of doing it. Create a new column on countries_df using select. Groupby using country_id, and countries column cast as a string. Code below.

new =cities_df.join(countries_df.select('*',explode('countries.country').alias('city_id')), how='left', on='city_id').groupby('country_id',col('countries').cast('string').alias('countries')).agg(flatten(collect_set('cities')).alias('cities')).show(truncate=False)

 ---------- ---------------------------------- ------------------------------------------------------ 
|country_id|countries                         |cities                                                |
 ---------- ---------------------------------- ------------------------------------------------------ 
|11        |{add -> [32], country -> [22, 15]}|[Milan, Turin, Rome, Naples, Florence, Genoa]         |
|680       |{add -> [], country -> [56, 43]}  |[New York, Dallas, Chicago, Houston, San Jose, Boston]|
 ---------- ---------------------------------- ------------------------------------------------------ 
  • Related