This is a sample dataframe of the data that I have:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from datetime import datetime
from pyspark.sql import Window
data2 = [
(datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 1", 0),
(datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 2", 1),
(datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 2", 1),
(datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 3", 1),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 1", 1),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 2", 3),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 3", 2),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 1", 10),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 2", 15),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 3", 9),
(datetime.strptime("2021/01/02", "%Y/%m/%d"), "Store A", "Product 1", 0),
(datetime.strptime("2021/01/03", "%Y/%m/%d"), "Store A", "Product 2", 2)
]
schema = StructType([ \
StructField("date",DateType(),True), \
StructField("store",StringType(),True), \
StructField("product",StringType(),True), \
StructField("stock_c", IntegerType(), True)
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
root
|-- date: date (nullable = true)
|-- store: string (nullable = true)
|-- product: string (nullable = true)
|-- stock_c: integer (nullable = true)
---------- ------- --------- -------
|date |store |product |stock_c|
---------- ------- --------- -------
|2020-12-29|Store B|Product 1|0 |
|2020-12-29|Store B|Product 2|1 |
|2020-12-31|Store A|Product 2|1 |
|2020-12-31|Store A|Product 3|1 |
|2021-01-01|Store A|Product 1|1 |
|2021-01-01|Store A|Product 2|3 |
|2021-01-01|Store A|Product 3|2 |
|2021-01-01|Store B|Product 1|10 |
|2021-01-01|Store B|Product 2|15 |
|2021-01-01|Store B|Product 3|9 |
|2021-01-02|Store A|Product 1|0 |
|2021-01-03|Store A|Product 2|2 |
---------- ------- --------- -------
Column stock_c
represents the cumulative stock of the product in the store.
I want to create two new columns, one of them tells me how many products does the store have or has had in the past. This is easy. The other column I need is the number of products that have stock that day in that store, and this is where I can't get to solve this.
This is the code that I used:
windowStore = Window.partitionBy("store").orderBy("date")
df \
.withColumn("num_products", approx_count_distinct("product").over(windowStore)) \
.withColumn("num_products_with_stock", approx_count_distinct(when(col("stock_c") > 0, col("product"))).over(windowStore)) \
.show()
This is what I get:
---------- ------- --------- ------- ------------ -----------------------
| date| store| product|stock_c|num_products|num_products_with_stock|
---------- ------- --------- ------- ------------ -----------------------
|2020-12-31|Store A|Product 2| 1| 2| 2|
|2020-12-31|Store A|Product 3| 1| 2| 2|
|2021-01-01|Store A|Product 1| 1| 3| 3|
|2021-01-01|Store A|Product 2| 3| 3| 3|
|2021-01-01|Store A|Product 3| 2| 3| 3|
|2021-01-02|Store A|Product 1| 0| 3| 3|
|2021-01-03|Store A|Product 2| 2| 3| 3|
|2020-12-29|Store B|Product 1| 0| 2| 1|
|2020-12-29|Store B|Product 2| 1| 2| 1|
|2021-01-01|Store B|Product 1| 10| 3| 3|
|2021-01-01|Store B|Product 2| 15| 3| 3|
|2021-01-01|Store B|Product 3| 9| 3| 3|
---------- ------- --------- ------- ------------ -----------------------
This is what I would like to get:
---------- ------- --------- ------- ------------ -----------------------
| date| store| product|stock_c|num_products|num_products_with_stock|
---------- ------- --------- ------- ------------ -----------------------
|2020-12-31|Store A|Product 2| 1| 2| 2|
|2020-12-31|Store A|Product 3| 1| 2| 2|
|2021-01-01|Store A|Product 1| 1| 3| 3|
|2021-01-01|Store A|Product 2| 3| 3| 3|
|2021-01-01|Store A|Product 3| 2| 3| 3|
|2021-01-02|Store A|Product 1| 0| 3| 2|
|2021-01-03|Store A|Product 2| 2| 3| 2|
|2020-12-29|Store B|Product 1| 0| 2| 1|
|2020-12-29|Store B|Product 2| 1| 2| 1|
|2021-01-01|Store B|Product 1| 10| 3| 3|
|2021-01-01|Store B|Product 2| 15| 3| 3|
|2021-01-01|Store B|Product 3| 9| 3| 3|
---------- ------- --------- ------- ------------ -----------------------
The key is in these two lines, as Product 1 has no more stock and then it should reflect that you only have 2 products with stock (Product 2 and Product 3).
|2021-01-02|Store A|Product 1| 0| 3| 2|
|2021-01-03|Store A|Product 2| 2| 3| 2|
How can I achieve what I want?
Thanks in advance.
CodePudding user response:
You can find below the code I used to solve the issue of num_products_with_stock
column. Basically I created a new conditional column that replace the Product for None
when the stock_c
is 0. At the end of day I use a very close code as you had used but did the F.approx_count_distinct
on this new column I created.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
window1 = W.partitionBy("store").orderBy("date")
window2 = W.partitionBy(["store", "date"]).orderBy("date")
df = (df
.withColumn("num_products", F.approx_count_distinct("product").over(window1))
.withColumn('hasItem', F.when(F.col('stock_c') > 0, F.col('product')).otherwise(None))
.withColumn("num_products_with_stock", F.approx_count_distinct(F.col("hasItem")).over(window2))
.drop('hasItem')
)
df.show()
Hope this solve your issue!
CodePudding user response:
I finally got to solve it with the help of @danimille
First of all, I completed the missing dates and then calculated the number of products with stock with a helper column called has_stock
:
from datetime import timedelta
from pyspark.sql.types import ArrayType, TimestampType
def dates_between(t1, t2):
return [t1 timedelta(days=x) for x in range(0, int((t2-t1).days) 1)]
dates_between_udf = udf(dates_between, ArrayType(TimestampType()))
date_filler = (
df
.withColumn('date', to_timestamp(to_date('date'))) # Ñapa de las gordas
.withColumn("max_date", max("date").over(Window.partitionBy("store")))
.withColumn("min_date", min("date").over(Window.partitionBy("store")))
.withColumn("products", collect_set("product").over(Window.partitionBy("store")))
.withColumn("dates", dates_between_udf(col("min_date"), col("max_date")))
.select("store", "products", "dates")
.distinct()
.withColumn("product", explode("products"))
.withColumn("date", explode("dates"))
.drop("products", "dates")
)
(
df
.join(date_filler, on = ["store", "product", "date"], how = "full")
.withColumn(
"stock_c",
last("stock_c", ignorenulls=True).over(Window.partitionBy("store", "product").orderBy(col("date")))
)
.na.fill(0, "stock_c")
.withColumn("num_products", approx_count_distinct("product").over(windowStore))
.withColumn("has_stock", when(col("stock_c") > 0, 1).otherwise(0))
.withColumn("num_products_with_stock", sum("has_stock").over(Window.partitionBy("store", "date")))
.show()
)
The result is the following:
------- --------- ------------------- ------- ------------ ----------------------- ---------
| store| product| date|stock_c|num_products|num_products_with_stock|has_stock|
------- --------- ------------------- ------- ------------ ----------------------- ---------
|Store A|Product 1|2020-12-31 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2020-12-31 00:00:00| 1| 3| 2| 1|
|Store A|Product 3|2020-12-31 00:00:00| 1| 3| 2| 1|
|Store A|Product 1|2021-01-01 00:00:00| 1| 3| 3| 1|
|Store A|Product 2|2021-01-01 00:00:00| 3| 3| 3| 1|
|Store A|Product 3|2021-01-01 00:00:00| 2| 3| 3| 1|
|Store A|Product 1|2021-01-02 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2021-01-02 00:00:00| 3| 3| 2| 1|
|Store A|Product 3|2021-01-02 00:00:00| 2| 3| 2| 1|
|Store A|Product 1|2021-01-03 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2021-01-03 00:00:00| 2| 3| 2| 1|
|Store A|Product 3|2021-01-03 00:00:00| 2| 3| 2| 1|
|Store B|Product 1|2020-12-29 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-29 00:00:00| 1| 3| 1| 1|
|Store B|Product 3|2020-12-29 00:00:00| 0| 3| 1| 0|
|Store B|Product 1|2020-12-30 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-30 00:00:00| 1| 3| 1| 1|
|Store B|Product 3|2020-12-30 00:00:00| 0| 3| 1| 0|
|Store B|Product 1|2020-12-31 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-31 00:00:00| 1| 3| 1| 1|
------- --------- ------------------- ------- ------------ ----------------------- ---------
only showing top 20 rows