I have two tables
----- -----
|store|sales|
----- -----
| F| 4000|
| M| 3000|
| A| 4000|
----- ----- `
----- ------
| upc| store|
----- ------
|40288|[F, M]|
|42114| [M]|
|39192|[F, A]|
----- ------ `
I wish to have the final table as
----- ------ -----
| upc| store|sales|
----- ------ -----
|40288|[F, M]| 7000|
|42114| [M]| 3000|
|39192|[F, A]| 8000|
----- ------ -----
Please use this code for data frame generation
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import *
spark = SparkSession.builder.appName("SparkByExamples.com").getOrCreate()
data2 = [
("F", 4000),
("M", 3000),
("A", 4000),
]
schema = StructType(
[
StructField("store", StringType(), True),
StructField("sales", IntegerType(), True),
]
)
df11 = spark.createDataFrame(data=data2, schema=schema)
data3 = [
("40288", ["F", "M"]),
("42114", ["M"]),
("39192", ["F", "A"]),
]
schema = StructType(
[
StructField("upc", StringType(), True),
StructField("store", StringType(), True),
]
)
df22 = spark.createDataFrame(data=data3, schema=schema)
I can make this work using loops but it will be very inefficient for big_data. I have this piece of code with loops for pandas data frame but now migrating to Pyspark, so need an equivalent in Pyspark. is there a better way to do without loops to get final_table as shown above?
for i, row in df22.iterrows():
new_sales = df11[df11.store.isin(df22[df22.upc == row.upc]["store"].values[0])][
"sales"
].sum()
df22.at[i, "sales"] = new_sales
CodePudding user response:
You can join
based on contains
. After join, group by upc
and store
in df22 and sum
sales.
from pyspark.sql import functions as F
df11_with_df22 = df11.join(df22, df22["store"].contains(df11["store"]))
df11_with_df22.groupBy(df22["upc"], df22["store"]).agg(F.sum("sales").alias("sales")).show()
Output
----- ------ -----
| upc| store|sales|
----- ------ -----
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
|42114| [M]| 3000|
----- ------ -----
CodePudding user response:
Modifying a little bit your input because df22["store"]
is of type Array(String) in your sample:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
data3 = [
("40288", ["F", "M"]),
("42114", ["M"]),
("39192", ["F", "A"]),
]
schema = StructType(
[
StructField("upc", StringType(), True),
StructField("store", ArrayType(StringType()), True),
]
)
df22 = spark.createDataFrame(data=data3, schema=schema)
df22 is more coherent :
df22.show()
----- ------
| upc| store|
----- ------
|40288|[F, M]|
|42114| [M]|
|39192|[F, A]|
----- ------
df22.printSchema()
root
|-- upc: string (nullable = true)
|-- store: array (nullable = true)
| |-- element: string (containsNull = true)
From that data, I explode the store, use it to join, then aggregate on upc to recreate the store as list and generate the sales.
from pyspark.sql import functions as F
df = (
df22.withColumn("store", F.explode("store"))
.join(df11, on="store")
.groupBy("upc")
.agg(F.collect_list("store").alias("store"), F.sum("sales").alias("sales"))
)
and the result :
df.show()
----- ------ -----
| upc| store|sales|
----- ------ -----
|42114| [M]| 3000|
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
----- ------ -----
df.printSchema()
root
|-- upc: string (nullable = true)
|-- store: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sales: long (nullable = true)