Home > Software engineering >  merge two pyspark dataframe based on one column containing list and other as values
merge two pyspark dataframe based on one column containing list and other as values

Time:12-15

I have two tables

 ----- ----- 
|store|sales|
 ----- ----- 
|    F| 4000|
|    M| 3000|
|    A| 4000|
 ----- ----- `

 ----- ------ 
|  upc| store|
 ----- ------ 
|40288|[F, M]|
|42114|   [M]|
|39192|[F, A]|
 ----- ------ `

I wish to have the final table as

 ----- ------ ----- 
|  upc| store|sales|
 ----- ------ ----- 
|40288|[F, M]| 7000|
|42114|   [M]| 3000|
|39192|[F, A]| 8000|
 ----- ------ ----- 

Please use this code for data frame generation

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import *


spark = SparkSession.builder.appName("SparkByExamples.com").getOrCreate()

data2 = [
    ("F", 4000),
    ("M", 3000),
    ("A", 4000),
]
schema = StructType(
    [
        StructField("store", StringType(), True),
        StructField("sales", IntegerType(), True),
    ]
)
df11 = spark.createDataFrame(data=data2, schema=schema)


data3 = [
    ("40288", ["F", "M"]),
    ("42114", ["M"]),
    ("39192", ["F", "A"]),
]
schema = StructType(
    [
        StructField("upc", StringType(), True),
        StructField("store", StringType(), True),
    ]
)
df22 = spark.createDataFrame(data=data3, schema=schema)

I can make this work using loops but it will be very inefficient for big_data. I have this piece of code with loops for pandas data frame but now migrating to Pyspark, so need an equivalent in Pyspark. is there a better way to do without loops to get final_table as shown above?

for i, row in df22.iterrows():
    new_sales = df11[df11.store.isin(df22[df22.upc == row.upc]["store"].values[0])][
        "sales"
    ].sum()
    df22.at[i, "sales"] = new_sales

CodePudding user response:

You can join based on contains. After join, group by upc and store in df22 and sum sales.

from pyspark.sql import functions as F

df11_with_df22 = df11.join(df22, df22["store"].contains(df11["store"]))

df11_with_df22.groupBy(df22["upc"], df22["store"]).agg(F.sum("sales").alias("sales")).show()

Output

 ----- ------ ----- 
|  upc| store|sales|
 ----- ------ ----- 
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
|42114|   [M]| 3000|
 ----- ------ ----- 

CodePudding user response:

Modifying a little bit your input because df22["store"] is of type Array(String) in your sample:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType


data3 = [
    ("40288", ["F", "M"]),
    ("42114", ["M"]),
    ("39192", ["F", "A"]),
]
schema = StructType(
    [
        StructField("upc", StringType(), True),
        StructField("store", ArrayType(StringType()), True),
    ]
)
df22 = spark.createDataFrame(data=data3, schema=schema)

df22 is more coherent :

df22.show()
 ----- ------                                                                   
|  upc| store|
 ----- ------ 
|40288|[F, M]|
|42114|   [M]|
|39192|[F, A]|
 ----- ------ 

df22.printSchema()
root
 |-- upc: string (nullable = true)
 |-- store: array (nullable = true)
 |    |-- element: string (containsNull = true)

From that data, I explode the store, use it to join, then aggregate on upc to recreate the store as list and generate the sales.

from pyspark.sql import functions as F


df = (
    df22.withColumn("store", F.explode("store"))
    .join(df11, on="store")
    .groupBy("upc")
    .agg(F.collect_list("store").alias("store"), F.sum("sales").alias("sales"))
)

and the result :

df.show()
 ----- ------ -----                                                             
|  upc| store|sales|
 ----- ------ ----- 
|42114|   [M]| 3000|
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
 ----- ------ ----- 

df.printSchema()
root
 |-- upc: string (nullable = true)
 |-- store: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- sales: long (nullable = true)
  • Related