Here is the scenario. Assuming I have the following table:
identifier | line |
---|---|
51169081604 | 2 |
00034886044 | 22 |
51168939455 | 52 |
The challenge is to, for every single column line, select the next biggest column line, which I have accomplished by the following SQL:
SELECT i1.line,i1.identifier,
MAX(i1.line) OVER (
ORDER BY i1.line ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)AS parent
FROM global_temp.documentIdentifiers i1
The challenge is partially solved alright, the problem is, when I execute this code on Spark, the performance is terrible. The warning message is very clear about it:
No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Partitioning by any of the two fields does not work, it breaks the result, of course, as every created partition is not aware of the other lines.
Does anyone have any clue on how can I " select the next biggest column line" without performance issues?
Thanks
CodePudding user response:
Using your "next" approach AND assuming the data is generated in ascending line order, the following does work in parallel, but if actually faster you can tell me; I do not know your volume of data. In any event you cannot solve just with SQL (%sql).
Here goes:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
case class X(identifier: Long, line: Long) // Too hard to explain, just gets around issues with df --> rdd --> df.
// Gen some more data.
val df = Seq(
(1000000, 23), (1200, 56), (1201, 58), (1202, 60),
(8200, 63), (890000, 67), (990000, 99), (33000, 123),
(33001, 124), (33002, 126), (33009, 132), (33019, 133),
(33029, 134), (33039, 135), (800, 201), (1800, 999),
(1801, 1999), (1802, 2999), (1800444, 9999)
).toDF("identifier", "line")
// Add partition so as to be able to apply parallelism - except for upper boundary record.
val df2 = df.as[X]
.rdd
.mapPartitionsWithIndex((index, iter) => {
iter.map(x => (index, x ))
}).mapValues(v => (v.identifier, v.line)).map(x => (x._1, x._2._1, x._2._2))
.toDF("part", "identifier", "line")
// Process per partition.
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("part").orderBy("line")
val df3 = df2.withColumn("next", lead("line", 1, null).over(w))
// Process upper boundary.
val df4 = df3.filter(df3("part") =!= 0).groupBy("part").agg(min("line").as("nxt")).toDF("pt", "nxt")
val df5 = df3.join(df4, (df3("part") === df4("pt") - 1), "outer" )
val df6 = df5.withColumn("next", when(col("next").isNull, col("nxt")).otherwise(col("next"))).select("identifier", "line", "next")
// Display. Sort accordingly.
df6.show(false)
returns:
---------- ---- ----
|identifier|line|next|
---------- ---- ----
|1000000 |23 |56 |
|1200 |56 |58 |
|1201 |58 |60 |
|1202 |60 |63 |
|8200 |63 |67 |
|890000 |67 |99 |
|990000 |99 |123 |
|33000 |123 |124 |
|33001 |124 |126 |
|33002 |126 |132 |
|33009 |132 |133 |
|33019 |133 |134 |
|33029 |134 |135 |
|33039 |135 |201 |
|800 |201 |999 |
|1800 |999 |1999|
|1801 |1999|2999|
|1802 |2999|9999|
|1800444 |9999|null|
---------- ---- ----
You can add additional sorting etc. Relies on narrow transformation when adding partition index. How you load may be an issue. Caching not considered.
If data is not ordered as stated above, a range partitioning needs to occur first.