from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Create PySpark dataframe
columns = ["user","hiring_date","termination_date"]
data = [("A", "1995-09-08", "1997-09-09"), ("A", "2003-05-08", "2006-11-09"),
("A", "2000-05-06", "2003-05-09"), ("B", "2007-06-27", "2008-05-27"),
("C", "2003-01-20", "2006-01-19"), ("C", "2011-04-03", "2011-04-04")]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = spark \
.createDataFrame(rdd) \
.toDF(*columns) \
.withColumn('hiring_date', F.expr('CAST(hiring_date AS DATE)')) \
.withColumn('termination_date', F.expr('CAST(termination_date AS DATE)'))
df.show()
---- ----------- ----------------
|user|hiring_date|termination_date|
---- ----------- ----------------
| A| 1995-09-08| 1997-09-09|
| A| 2003-05-08| 2006-11-09|
| A| 2000-05-06| 2003-05-09|
| B| 2007-06-27| 2008-05-27|
| C| 2003-01-20| 2006-01-19|
| C| 2011-04-03| 2011-04-04|
---- ----------- ----------------
In the above example, I have multiple users with a start date hiring_date
and an end date termination_date
. Per user, there can be single as well as multiple rows. In addition, users can have multiple jobs at the same time (overlapping termination and hiring dates).
For each user, I need to calculate the following:
- The number of days the user was working. Overlapping dates should not be counted multiple times.
- The number of days the user was not working (i.e., was on vacation).
CodePudding user response:
Full code (this is implemented in Scala but it is very similar if not identical to Python):
var ds = spark.sparkContext.parallelize(Seq(
("A", "1995-09-08", "1997-09-09"),
("A", "2003-05-08", "2006-11-09"),
("A", "2000-05-06", "2003-05-09"),
("B", "2007-06-27", "2008-05-27"),
("C", "2003-01-20", "2006-01-19"),
("C", "2011-04-03", "2011-04-04"),
)).toDF("user", "hiring_date", "termination_date")
// Convert the strings to date first
ds = ds
.withColumn("hiring_date", to_date(col("hiring_date"), "yyyy-MM-dd"))
.withColumn("termination_date", to_date(col("termination_date"), "yyyy-MM-dd"))
// Find the working days for each employee, where we generate dates from start to end for intervals
val workDays = ds
.withColumn("grouped", sequence(col("hiring_date"), col("termination_date")))
.withColumn("grouped", explode(col("grouped")))
// We drop duplicates because of the overlapping dates
.select("user", "grouped").dropDuplicates()
// We create an indicator, so we know later which date is holiday and which is not
.withColumn("ind", lit(1))
// We generate a full history of the first and last date the user was working, for all jobs
val fullDays = ds
.groupBy("user").agg(min("hiring_date").as("min"), max("termination_date").as("max"))
.withColumn("grouped", sequence(col("min"), col("max")).as("grouped"))
.withColumn("grouped", explode(col("grouped")))
.select("user", "grouped")
// We join fullDays with workDays, wherever 'ind' is 1, we have workdays, otherwise non workdays
val result = fullDays.join(workDays, Seq("user", "grouped"), "left")
// We filter working days, we group by user and we count
val workingDays = result.filter(col("ind").equalTo(1)).groupBy("user").count()
// We filter non working days, we group by user and we count
val nonWorkingDays = result.filter(col("ind").isNull).groupBy("user").count()
workingDays.show(10)
---- -----
|user|count|
---- -----
| B| 336|
| C| 1098|
| A| 3112|
---- -----
nonWorkingDays.show(10)
---- -----
|user|count|
---- -----
| C| 1899|
| A| 969|
---- -----
I hope this is what you need, good luck!
CodePudding user response:
If by working days you mean to exclude the weekly holidays (Sat, Sun), we can do that getting an array of dates and then retaining only the dates that fall in the work week (using dayofweek
).
data_sdf. \
withColumn('prev_tdt',
func.lag('termination_date').over(wd.partitionBy('user').orderBy('hiring_date'))
). \
withColumn('new_hiredt',
func.when(func.col('prev_tdt') >= func.col('hiring_date'), func.date_add('prev_tdt', 1)).
otherwise(func.col('hiring_date'))
). \
withColumn('date_seq',
func.expr('sequence(new_hiredt, termination_date, interval 1 day)')
). \
withColumn('num_workday',
func.size(func.expr('filter(date_seq, x -> dayofweek(x) not in (1, 7))'))
). \
withColumn('tot_days', func.size('date_seq')). \
withColumn('num_nonworkday',
func.coalesce(func.datediff('new_hiredt', 'prev_tdt') - 1, func.lit(0))
). \
groupBy('user'). \
agg(func.sum('num_workday').alias('num_workday'),
func.sum('num_nonworkday').alias('num_nonworkday')
). \
orderBy('user'). \
show()
# ---- ----------- --------------
# |user|num_workday|num_nonworkday|
# ---- ----------- --------------
# | A| 2222| 969|
# | B| 240| 0|
# | C| 785| 1899|
# ---- ----------- --------------
If you don't want to exclude the weekly holidays, you can use the tot_days
field as number of work days. The new_hiredt
column is created to get the start date for records that have overlap with the previous record's termination date.
CodePudding user response:
In case anyone is interested in the PySpark solution based on vilalabinot's post:
# Create PySpark dataframe
columns = ["user","hiring_date","termination_date"]
data = [("A", "1995-09-08", "1997-09-09"), ("A", "2003-05-08", "2006-11-09"),
("A", "2000-05-06", "2003-05-09"), ("B", "2007-06-27", "2008-05-27"),
("C", "2003-01-20", "2006-01-19"), ("C", "2011-04-03", "2011-04-04")]
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = spark \
.createDataFrame(rdd) \
.toDF(*columns) \
.withColumn('hiring_date', F.expr('CAST(hiring_date AS DATE)')) \
.withColumn('termination_date', F.expr('CAST(termination_date AS DATE)'))
# Find the working days for each employee, where we generate dates from start to end for intervals
# We drop duplicates because of the overlapping duplicates
# We create an indicator, so we know later which date is holiday and which is not
work_days = df \
.withColumn("grouped", sequence(col("hiring_date"), col("termination_date"))) \
.withColumn("grouped", explode(col("grouped"))) \
.select("user", "grouped").dropDuplicates() \
.withColumn("ind", lit(1))
# We generate a full history of the first and last
# date the user was working, for all jobs
full_days = df \
.groupBy("user") \
.agg(min("hiring_date").alias("min"), max("termination_date").alias("max")) \
.withColumn("grouped", sequence(col("min"), col("max")).alias("grouped")) \
.withColumn("grouped", explode(col("grouped"))) \
.select("user", "grouped")
# We join fullDays with workDays, wherever 'ind'
# is 1, we have workdays, otherwise non workdays
result = full_days.join(work_days, ["user", "grouped"], "left")
# We filter working days, we group by user and we count
working_days = result.filter(col("ind") == 1).groupBy("user").agg(count('user').alias('working_days'))
# We filter non working days, we group by user and we count
nonworking_days = result.filter(col("ind").isNull()).groupBy("user").agg(count('user').alias('nonworking_days'))
# Return original dataframe with new values
df_final = df \
.select('user') \
.dropDuplicates() \
.join(working_days, 'user', 'left') \
.join(nonworking_days, 'user', 'left')
df_final.show()
---- ------------ ---------------
|user|working_days|nonworking_days|
---- ------------ ---------------
| B| 2| null|
| A| 3112| 969|
| C| 1098| 1899|
---- ------------ ---------------