I have a hive table that records user behavior
like this
userid | behavior | timestamp | url |
---|---|---|---|
1 | view | 1650022601 | url1 |
1 | click | 1650022602 | url2 |
1 | click | 1650022614 | url3 |
1 | view | 1650022617 | url4 |
1 | click | 1650022622 | url5 |
1 | view | 1650022626 | url7 |
2 | view | 1650022628 | url8 |
2 | view | 1650022631 | url9 |
About 400GB is added to the table every day.
I want to order by timestamp asc, then one 'view' is in a group between another 'view' like this table, the first 3 lines belong to a same group , then subtract the timestamps, like 1650022614 - 1650022601 as the view time.
How to do this?
i try lag and lead function, or scala like this
val pairRDD: RDD[(Int, String)] = record.map(x => {
if (StringUtil.isDateString(x.split("\\s ")(0))) {
partition = partition 1
(partition, x)
} else {
(partition, x)
}
})
or java like this
LongAccumulator part = spark.sparkContext().longAccumulator("part");
JavaPairRDD<Long, Row> pairRDD = spark.sql(sql).coalesce(1).javaRDD().mapToPair((PairFunction<Row, Long, Row>) row -> {
if (row.getAs("event") == "pageview") {
part.add(1L);
}
return new Tuple2<>(part.value(), row);
});
but when a dataset is very large, this code just stupid.
save me plz
CodePudding user response:
If you use dataframe, you can build partition by using window that sum a column whose value is 1 when you change partition and 0 if you don't change partition.
You can transform a RDD to a dataframe with sparkSession.createDataframe()
method as explained in this answer
Back to your problem. In you case, you change partition every time column behavior is equal to "view". So we can start with this condition:
import org.apache.spark.sql.functions.col
val df1 = df.withColumn("is_view", (col("behavior") === "view").cast("integer"))
You get the following dataframe:
------ -------- ---------- ---- -------
|userid|behavior|timestamp |url |is_view|
------ -------- ---------- ---- -------
|1 |view |1650022601|url1|1 |
|1 |click |1650022602|url2|0 |
|1 |click |1650022614|url3|0 |
|1 |view |1650022617|url4|1 |
|1 |click |1650022622|url5|0 |
|1 |view |1650022626|url7|1 |
|2 |view |1650022628|url8|1 |
|2 |view |1650022631|url9|1 |
------ -------- ---------- ---- -------
Then you use a window ordered by timestamp to sum over the is_view
column:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum
val df2 = df1.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
Which get you the following dataframe:
------ -------- ---------- ---- ------- ---------
|userid|behavior|timestamp |url |is_view|partition|
------ -------- ---------- ---- ------- ---------
|1 |view |1650022601|url1|1 |1 |
|1 |click |1650022602|url2|0 |1 |
|1 |click |1650022614|url3|0 |1 |
|1 |view |1650022617|url4|1 |2 |
|1 |click |1650022622|url5|0 |2 |
|1 |view |1650022626|url7|1 |3 |
|2 |view |1650022628|url8|1 |1 |
|2 |view |1650022631|url9|1 |2 |
------ -------- ---------- ---- ------- ---------
Then, you just have to aggregate per userid and partition:
import org.apache.spark.sql.functions.{max, min}
val result = df2.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))
And you get the following results:
------ --------- --------
|userid|partition|duration|
------ --------- --------
|1 |1 |13 |
|1 |2 |5 |
|1 |3 |0 |
|2 |1 |0 |
|2 |2 |0 |
------ --------- --------
The complete scala code:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, max, min, sum}
val result = df
.withColumn("is_view", (col("behavior") === "view").cast("integer"))
.withColumn("partition", sum("is_view").over(Window.partitionBy("userid").orderBy("timestamp")))
.groupBy("userid", "partition")
.agg((max("timestamp") - min("timestamp")).as("duration"))