Home > database >  Dataframe in Scala
Dataframe in Scala

Time:04-27

I am trying to train the model for recommendation for movie. I have a dataset which has list of all the casts, movie details with description. based on the occurance of name of cast in movie description, I am trying to map it.

df_castes:

                name
0      Mark Wahlberg
1  Leonardo DiCaprio
2          Kate Mara
3        Ben Affleck

df_movies:

              name                                                                                                       description released
0  Departed         South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ...           2006   
1  Shooter          A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of...  2007   
2  Triple Frontier  Former Special Forces operative Tom (Ben Affleck) and his team ......                                             2019   

I am doing something like this in python to create the expected data frame below:

c_list = list()
m_list = list()
for index_m, row_m in df_movies.iterrows():
    for index_c, row_c in df_castes.iterrows():
        caste = row_c['name']
        movie = row_m['name']  # need to add name of the movie in output if caste is mentioned in description
        if caste in row_m['description']:  
            c_list.append(caste)
            m_list.append(movie)

output = {"caste_name": c_list, "movie_name": m_list}
df_output = pd.DataFrame(data=output)
print(df_output)

expected output:

        caste_name          movie_name
0       Mark Wahlberg       Departed
1       Leonardo DiCaprio   Departed
2       Mark Wahlberg       Shooter
3       Kate Mara           Shooter
4       Ben Affleck         Triple Frontier

I am trying to do it in scala way using map or filter and create new expected dataframe. Any idea on how can use filter or map to do this?

CodePudding user response:

If you're starting out with two Scala collections you could create a Spark DataFrame as follows:

val castes = Seq("Mark Wahlberg", "Leonardo DiCaprio", "Kate Mara", "Ben Affleck")
val movies = Seq(
  ("Departed", "South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ...", 2006),
  ("Shooter", "A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of...", 2007),
  ("Triple Frontier", "Former Special Forces operative Tom (Ben Affleck) and his team ......", 2019)
)

val df1 = castes.flatMap(r => {
  movies
    .filter(x => x._2.contains(r))
    .map(s => {
      (r, s._1)
    })
}).toDF("caste_name", "movie_name")

If you already have two Spark DataFrames, you will struggle to get something nested working, I think. As suggested in the comment to your post, you're just better off doing a cross-join and filtering the result. Something like:

import spark.implicits._
import org.apache.spark.sql.functions._

val df_castes: DataFrame = castes.toDF("name")
val df_movies: DataFrame = movies.toDF("name", "description", "released")

val df2 = df_movies
  .withColumnRenamed("name", "movie_name")
  .crossJoin(df_castes.withColumnRenamed("name", "caste_name"))
  .filter(col("description") contains col("caste_name"))
  .select("caste_name", "movie_name")

I hope I've understood the setup correctly...

Edit: In response to the comment about why a cross join and not an inner join... I don't know. Either way we need to assess every pair in the cross, and I guess I trusted Spark to optimise things. I like that the cross join makes it explicit that all pairs are enumerated. Not sure what the performance differences would be over large collectoins. The plans are:

scala> :paste
// Entering paste mode (ctrl-D to finish)

df_movies
  .withColumnRenamed("name", "movie_name")
  .crossJoin(df_castes.withColumnRenamed("name", "caste_name"))
  .filter(col("description") contains col("caste_name"))
  .select("caste_name", "movie_name")
  .explain

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [caste_name#118, movie_name#114]
 - BroadcastNestedLoopJoin BuildRight, Cross, Contains(description#13, caste_name#118)
   :- LocalTableScan [movie_name#114, description#13]
    - BroadcastExchange IdentityBroadcastMode
       - LocalTableScan [caste_name#118]

versus

scala> :paste
// Entering paste mode (ctrl-D to finish)

df_movies
  .withColumnRenamed("name", "movie_name")
  .join(df_castes.withColumnRenamed("name", "caste_name"), col("description") contains col("caste_name"), "inner")
  .select("caste_name", "movie_name")
  .explain

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [caste_name#131, movie_name#127]
 - BroadcastNestedLoopJoin BuildRight, Inner, Contains(description#13, caste_name#131)
   :- LocalTableScan [movie_name#127, description#13]
    - BroadcastExchange IdentityBroadcastMode
       - LocalTableScan [caste_name#131]

Edit 2: I guess maybe it was a lazy response. In the plain Scala case, I guess it's the difference between?:

castes.flatMap(r => {
  movies
    .filter(x => x._2.contains(r))
    .map(s => {
      (r, s._1)
    })
})

and

castes.flatMap(r => {
  movies
    .map(s => {
      (r, s._1, s._2)
    }).filter(x => x._3 contains x._1)
})

So point taken.

CodePudding user response:

Another way you can create the require structure by using foldLeft

val castes = Seq(
  "Mark Wahlberg",
  "Leonardo DiCaprio",
  "Kate Mara",
  "Ben Affleck"
)
val movies = Seq(
  (
    "Departed",
    "South Boston cop Billy Costigan (Leonardo DiCaprio) goes under cover, detective (Mark Wahlberg) who ...",
    2006
  ),
  (
    "Shooter",
    "A top Marine sniper, Bob Lee Swagger (Mark Wahlberg), who leaves the military, Sarah (Kate Mara) the widow of...",
    2007
  ),
  (
    "Triple Frontier",
    "Former Special Forces operative Tom (Ben Affleck) and his team ......",
    2019
  )
)

val result = castes.foldLeft(
  List[(String, String)]()
)(
  (r1, caste) => {
    r1 ::: movies.foldLeft(
      List[(String, String)]()
    )(
      (r2, m) => {
        if (m._2.contains(caste)) r2 :  (caste, m._1) else r2
      }
    )
  }
)
println(result)
// convert result to DF
val df = result.toDF("caste_name", "movie_name")
df.show
  • Related