I am trying to train the model for recommendation for movie. I have a dataset which has list of all the casts, movie details with description. based on the occurance of name of cast in movie description, I am trying to map it.
df_castes:
name
0 Mark Wahlberg
1 Leonardo DiCaprio
2 Kate Mara
3 Ben Affleck
df_movies:
name description released
0 Departed South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ... 2006
1 Shooter A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of... 2007
2 Triple Frontier Former Special Forces operative Tom (Ben Affleck) and his team ...... 2019
I am doing something like this in python to create the expected data frame below:
c_list = list()
m_list = list()
for index_m, row_m in df_movies.iterrows():
for index_c, row_c in df_castes.iterrows():
caste = row_c['name']
movie = row_m['name'] # need to add name of the movie in output if caste is mentioned in description
if caste in row_m['description']:
c_list.append(caste)
m_list.append(movie)
output = {"caste_name": c_list, "movie_name": m_list}
df_output = pd.DataFrame(data=output)
print(df_output)
expected output:
caste_name movie_name
0 Mark Wahlberg Departed
1 Leonardo DiCaprio Departed
2 Mark Wahlberg Shooter
3 Kate Mara Shooter
4 Ben Affleck Triple Frontier
I am trying to do it in scala way using map or filter and create new expected dataframe. Any idea on how can use filter or map to do this?
CodePudding user response:
If you're starting out with two Scala collections you could create a Spark DataFrame as follows:
val castes = Seq("Mark Wahlberg", "Leonardo DiCaprio", "Kate Mara", "Ben Affleck")
val movies = Seq(
("Departed", "South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ...", 2006),
("Shooter", "A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of...", 2007),
("Triple Frontier", "Former Special Forces operative Tom (Ben Affleck) and his team ......", 2019)
)
val df1 = castes.flatMap(r => {
movies
.filter(x => x._2.contains(r))
.map(s => {
(r, s._1)
})
}).toDF("caste_name", "movie_name")
If you already have two Spark DataFrames, you will struggle to get something nested working, I think. As suggested in the comment to your post, you're just better off doing a cross-join and filtering the result. Something like:
import spark.implicits._
import org.apache.spark.sql.functions._
val df_castes: DataFrame = castes.toDF("name")
val df_movies: DataFrame = movies.toDF("name", "description", "released")
val df2 = df_movies
.withColumnRenamed("name", "movie_name")
.crossJoin(df_castes.withColumnRenamed("name", "caste_name"))
.filter(col("description") contains col("caste_name"))
.select("caste_name", "movie_name")
I hope I've understood the setup correctly...
Edit: In response to the comment about why a cross join and not an inner join... I don't know. Either way we need to assess every pair in the cross, and I guess I trusted Spark to optimise things. I like that the cross join makes it explicit that all pairs are enumerated. Not sure what the performance differences would be over large collectoins. The plans are:
scala> :paste
// Entering paste mode (ctrl-D to finish)
df_movies
.withColumnRenamed("name", "movie_name")
.crossJoin(df_castes.withColumnRenamed("name", "caste_name"))
.filter(col("description") contains col("caste_name"))
.select("caste_name", "movie_name")
.explain
// Exiting paste mode, now interpreting.
== Physical Plan ==
*(1) Project [caste_name#118, movie_name#114]
- BroadcastNestedLoopJoin BuildRight, Cross, Contains(description#13, caste_name#118)
:- LocalTableScan [movie_name#114, description#13]
- BroadcastExchange IdentityBroadcastMode
- LocalTableScan [caste_name#118]
versus
scala> :paste
// Entering paste mode (ctrl-D to finish)
df_movies
.withColumnRenamed("name", "movie_name")
.join(df_castes.withColumnRenamed("name", "caste_name"), col("description") contains col("caste_name"), "inner")
.select("caste_name", "movie_name")
.explain
// Exiting paste mode, now interpreting.
== Physical Plan ==
*(1) Project [caste_name#131, movie_name#127]
- BroadcastNestedLoopJoin BuildRight, Inner, Contains(description#13, caste_name#131)
:- LocalTableScan [movie_name#127, description#13]
- BroadcastExchange IdentityBroadcastMode
- LocalTableScan [caste_name#131]
Edit 2: I guess maybe it was a lazy response. In the plain Scala case, I guess it's the difference between?:
castes.flatMap(r => {
movies
.filter(x => x._2.contains(r))
.map(s => {
(r, s._1)
})
})
and
castes.flatMap(r => {
movies
.map(s => {
(r, s._1, s._2)
}).filter(x => x._3 contains x._1)
})
So point taken.
CodePudding user response:
Another way you can create the require structure by using foldLeft
val castes = Seq(
"Mark Wahlberg",
"Leonardo DiCaprio",
"Kate Mara",
"Ben Affleck"
)
val movies = Seq(
(
"Departed",
"South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ...",
2006
),
(
"Shooter",
"A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of...",
2007
),
(
"Triple Frontier",
"Former Special Forces operative Tom (Ben Affleck) and his team ......",
2019
)
)
val result = castes.foldLeft(
List[(String, String)]()
)(
(r1, caste) => {
r1 ::: movies.foldLeft(
List[(String, String)]()
)(
(r2, m) => {
if (m._2.contains(caste)) r2 : (caste, m._1) else r2
}
)
}
)
println(result)
// convert result to DF
val df = result.toDF("caste_name", "movie_name")
df.show