I have the below dataframe with 2 columns.
--------- -----------------
|column_a |column_b |
--------- -----------------
|text book|Music Book Movie |
|book |BOOK Film Theatre|
|note book|Music Movie Drama|
|rock |Pop Metal Jazz |
|hard rock|Blues Rap Rock |
--------- -----------------
I have to filter out rows where any word/token in column_a is present in column_b.
For example:
text book
should get filtered out because either text
or book
is present in column_b 1st row.
Similarly, hard rock
should also get filtered out because either hard
or rock
is present in column_b.
Also the 2nd row book
should get filtered out because the word BOOK
is present in column_b.
So my output dataframe is just:
--------- -----------------
|column_a |column_b |
--------- -----------------
|note book|Music Movie Drama|
|rock |Pop Metal Jazz |
--------- -----------------
I can filter based on text values like this -
val columnA = "text book"
val columnB = "Music Book Movie"
val tokensColumnA = columnA.split("\\s ").map(v => v.toLowerCase()).toSet
val tokensColumnB = columnB.split("\\s ").map(v => v.toLowerCase()).toSet
val check: Boolean = if (tokensColumnA.intersect(tokensColumnB).size == 0) true else false
But I'm not sure how to incorporate this in a Spark dataframe and filter accordingly.
CodePudding user response:
import org.apache.spark.sql.functions.{col, udf}
import spark.implicits._
case class D(column_a: String, column_b: String)
val df1 = Seq(
D("text book", "Music Book Movie"),
D("book", "BOOK Film Theatre"),
D("note book", "Music Movie Drama"),
D("rock", "Pop Metal Jazz"),
D("hard rock", "Blues Rap Rock")
).toDF()
df1.show(false)
// --------- -----------------
// |column_a |column_b |
// --------- -----------------
// |text book|Music Book Movie |
// |book |BOOK Film Theatre|
// |note book|Music Movie Drama|
// |rock |Pop Metal Jazz |
// |hard rock|Blues Rap Rock |
// --------- -----------------
val checkUDF = udf((columnA: String, columnB: String) => {
val tokensColumnA = columnA.split("\\s ").map(v => v.toLowerCase()).toSet
val tokensColumnB = columnB.split("\\s ").map(v => v.toLowerCase()).toSet
if (tokensColumnA.intersect(tokensColumnB).size == 0) true else false
})
val tmpDF = df1.withColumn("isCorect", checkUDF(col("column_a"), col("column_b")))
tmpDF.show(false)
// --------- ----------------- --------
// |column_a |column_b |isCorect|
// --------- ----------------- --------
// |text book|Music Book Movie |false |
// |book |BOOK Film Theatre|false |
// |note book|Music Movie Drama|true |
// |rock |Pop Metal Jazz |true |
// |hard rock|Blues Rap Rock |false |
// --------- ----------------- --------
val resDF = tmpDF.filter(col("isCorect") === true)
resDF.show(false)
// --------- ----------------- --------
// |column_a |column_b |isCorect|
// --------- ----------------- --------
// |note book|Music Movie Drama|true |
// |rock |Pop Metal Jazz |true |
// --------- ----------------- --------
val df = resDF.drop(col("isCorect"))
df.show(false)
// --------- -----------------
// |column_a |column_b |
// --------- -----------------
// |note book|Music Movie Drama|
// |rock |Pop Metal Jazz |
// --------- -----------------