Let's say I have the following Spark frame:
-------------------------- -----
|timestamp |name |
-------------------------- -----
|2021-11-06 16:29:00.004204|Alice|
|2021-11-06 16:29:00.004204|Bob |
-------------------------- -----
now I want to extract count value of records/rows based on timestamp for specific name == 'Alice'
as follows :
- 1st 12-hrs working shift (00:00-11:59:59)
- 2nd 12-hrs working shift (12:00-23:59:59)
- 1st 8-hrs working shift (00:00-07:59:59)
- 2nd 8-hrs working shift (08:00-15:59:59)
- 3rd 8-hrs working shift (16:00-23:59:59)
and return back the results to the Spark frame. I have tried the following approach unsuccessfully:
import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import dayofmonth, dayofweek
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
dict = [{ 'name': 'Alice'},
{ 'name': 'Bob'}]
#df = spark.createDataFrame(dict)
schema = StructType([
StructField("timestamp", TimestampType(), True), \
StructField("date", StringType(), True), \
StructField("name", StringType(), True), \
])
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.show(truncate=False)
#Generate data and timestamp
new_df = sdf.withColumn('timestamp', F.current_timestamp().cast("timestamp")) \
.withColumn('date', F.current_date().cast("date")) \
.withColumn('day_of_month', dayofmonth('timestamp')) \
.withColumn('day_of_week', ((dayofweek('timestamp') 5)%7) 1) # start of the week as a Monday = 1 (by default is Sunday = 1)
#.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
#.filter(col("timestamp").between("00:00","11:59")) \
#.groupBy("No. records in 1st 12-hrs", "name").sum("Count") \
#.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
#.filter(col("timestamp").between("12:00","23:59")) \
#.groupBy("No. records in 1st 12-hrs" , "name").sum("Count") \
#.withColumn('# No. records in 1st 8-hrs shift (00:00-07:59:59)', ????('timestamp')) \
#.withColumn('# No. records in 2nd 8-hrs shift (08:00-15:59:59)', ????('timestamp')) \
#.withColumn('# No. records in 3rd 8-hrs shift (16:00-23:59:59)', ????('timestamp')) \
new_df.show(truncate = False)
so until now, my output is as below which you can try in Colab notebook:
-------------------------- ---------- ----- ------------ -----------
|timestamp |date |name |day_of_month|day_of_week|
-------------------------- ---------- ----- ------------ -----------
|2021-11-06 16:17:43.698815|2021-11-06|Alice|6 |6 |
|2021-11-06 16:17:43.698815|2021-11-06|Bob |6 |6 |
-------------------------- ---------- ----- ------------ -----------
alternatively, I checked some post for Spark based data filtering as well as a cool answer and Group spark dataframe by date to apply on main spark frame for specific name
besides working shift range.
Please note that I'm not interested in using UDF
or hacking it via toPandas()
so expected results should be like that for specific name == 'Alice'
:
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
|No. records in 1st 12-hrs |No. records in 1st 12-hrs |No. records in 1st 8-hrs |No. records in 2nd 8-hrs |No. records in 3rd 8-hrs |
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
| | | | | |
-------------------------- -------------------------- -------------------------- -------------------------- --------------------------
CodePudding user response:
You can achieve that by checking the hour part of the timestamps is between [0, 11]
, [12, 23]
and so on...
import pyspark.sql.functions as F
new_df = sdf.groupBy("name").agg(
F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("1st-12-hrs"),
F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("2nd-12-hrs"),
F.sum(F.hour("timestamp").between(0, 7).cast("int")).alias("1st-8-hrs"),
F.sum(F.hour("timestamp").between(8, 15).cast("int")).alias("2nd-8-hrs"),
F.sum(F.hour("timestamp").between(16, 23).cast("int")).alias("3rd-8-hrs"),
)
new_df.show()
# ----- ---------- ---------- --------- --------- ---------
#|name |1st-12-hrs|2nd-12-hrs|1st-8-hrs|2nd-8-hrs|3rd-8-hrs|
# ----- ---------- ---------- --------- --------- ---------
#|Bob |0 |1 |0 |0 |1 |
#|Alice|0 |1 |0 |0 |1 |
# ----- ---------- ---------- --------- --------- ---------