Home > Software engineering >  Window & Aggregate functions in Pyspark SQL/SQL
Window & Aggregate functions in Pyspark SQL/SQL

Time:10-18

After the answer by @Vaebhav realized the question was not set up correctly. Hence editing it with his code snippet.

I have the following table

from pyspark.sql.types import IntegerType,TimestampType,DoubleType

input_str = """
4219,2018-01-01 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55,
1139,2018-01-21 11:05:00,1.0,400.0,
2170,2018-01-21 09:10:00,2.0,100.0,
4218,2018-02-06 09:36:00,5.0,307.55,
4218,2018-02-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id,timestamp,quantity,price".split(',')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]
sparkDF = sqlContext.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))\
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))\
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))\
                 .withColumn('price',F.col('price').cast(DoubleType()))

I want to calculate the aggergate as follows :

trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1

where the

  • trxn_date is date from the timestamp column,
  • daily_cust_visits is unique count of customers,
  • next_7_day_visits is a count of customers on a 7 day rolling window basis.
  • next_30_day_visits is a count of customers on a 30 day rolling window basis.

I want to write the code as a single SQL query.

CodePudding user response:

You can achieve this by using ROW rather than a RANGE Frame Type , a good explanation can be found here

ROW - based on physical offsets from the position of the current input row

RANGE - based on logical offsets from the position of the current input row

Also in your implementation ,a PARTITION BY clause would be redundant, as it wont create the required Frames for a look-ahead.

Data Preparation

input_str = """
4219,2018-01-02 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55
""".split(",")

input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))

cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id  timestamp   quantity    price".split('\t')))
        
n = len(input_values)
n_cols = 4

input_list = [tuple(input_values[i:i n_cols]) for i in range(0,n,n_cols)]

sparkDF = sql.createDataFrame(input_list,cols)

sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))\
                 .withColumn('timestamp',F.col('timestamp').cast(TimestampType()))\
                 .withColumn('quantity',F.col('quantity').cast(IntegerType()))\
                 .withColumn('price',F.col('price').cast(DoubleType()))

sparkDF.show()

 ----------- ------------------- -------- ------ 
|customer_id|          timestamp|quantity| price|
 ----------- ------------------- -------- ------ 
|       4219|2018-01-02 08:10:00|       3| 50.78|
|       4216|2018-01-02 08:01:00|       5|100.84|
|       4217|2018-01-02 20:00:00|       4|800.49|
|       4139|2018-01-03 11:05:00|       1| 400.0|
|       4170|2018-01-03 09:10:00|       2| 100.0|
|       4029|2018-01-06 09:06:00|       6|300.55|
|       4029|2018-01-06 09:16:00|       2|310.55|
|       4217|2018-01-06 09:36:00|       5|307.55|
 ----------- ------------------- -------- ------ 

Window Aggregates

sparkDF.createOrReplaceTempView("transactions")

sql.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY 'timestamp'
                        ROWS BETWEEN CURRENT ROW AND 7 FOLLOWING
            ) as next_7_day_visits
        FROM transactions
        GROUP BY 1
""").show()

 ---------- ------------------ ----------------- 
| trxn_date|unique_cust_visits|next_7_day_visits|
 ---------- ------------------ ----------------- 
|2018-01-02|                 3|                7|
|2018-01-03|                 2|                4|
|2018-01-06|                 2|                2|
 ---------- ------------------ ----------------- 

CodePudding user response:

Building upon @Vaebhav's answer the required query in this case is

sqlContext.sql("""
        SELECT 
            TO_DATE(timestamp) as trxn_date
            ,COUNT(DISTINCT customer_id) as unique_cust_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
            ) as next_7_day_visits
            ,SUM(COUNT(DISTINCT customer_id)) OVER (
                        ORDER BY CAST(TO_DATE(timestamp) AS TIMESTAMP) DESC
                        RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW
            ) as next_30_day_visits
        FROM transactions
        GROUP BY 1
        ORDER by trxn_date
""").show()
trxn_date unique_cust_visits next_7_day_visits next_30_day_visits
2018-01-01 1 7 9
2018-01-02 2 6 8
2018-01-03 2 4 6
2018-01-06 2 2 4
2018-01-21 2 2 3
2018-02-06 1 1 1
  • Related