I'm getting this error
line 23, in parseRating
IndexError: list index out of range
...upon any attempt at .collect()
, .count()
etc. So final line df3.collect()
throws that error, but all the .show()
's work. I don't think it's a problem with the data, but I could be wrong.
New to this, really not sure what's going on. Any help greatly appreciated.
import os
from os import remove, removedirs
from os.path import join, isfile, dirname
from pyspark.sql.functions import col, explode
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext
from pyspark.sql import SparkSession
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
fields = line.strip().split("::")
return int(fields[3]), int(fields[0]), int(fields[1]), float(fields[2])
#return int(fields[0]), int(fields[1]), float(fields[2])
if __name__ == "__main__":
# set up environment
spark = SparkSession.builder \
.master("local") \
.appName("Movie Recommendation Engine") \
.config("spark.driver.memory", "16g") \
.getOrCreate() \
sc = spark.sparkContext
# load personal ratings
#myRatings = loadRatings(os.path.abspath('personalRatings.txt'))
myRatingsRDD = sc.textFile("personalRatings.txt").map(parseRating)
ratings = sc.textFile("ratings.dat").map(parseRating)
df1 = spark.createDataFrame(myRatingsRDD,["timestamp","userID","movieID","rating"])
df1.show()
df2 = spark.createDataFrame(ratings,["timestamp","userID","movieID","rating"])
df2.show()
df3 = df1.union(df2)
df3.show()
df3.printSchema()
df3 = df3.\
withColumn('userID', col('userID').cast('integer')).\
withColumn('movieID', col('movieID').cast('integer')).\
withColumn('rating', col('rating').cast('float')).\
drop('timestamp')
df3.show()
ratings = df3
df3.collect()
CodePudding user response:
One of the lines in your textfile may be malformed/incomplete and as a result the split("::")
may not generate the number of expected fields. You may update your function to check the number of splits before trying to access the indexes. Eg.
def parseRating(line):
"""
Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
"""
fields = line.strip().split("::")
timestamp = int(fields[3]) if len(fields)>3 else None
userId = int(fields[0]) if len(fields)>0 else None
movieId = int(fields[1]) if len(fields)>1 else None
rating = float(fields[2]) if len(fields)>2 else None
return timestamp, userId, movieId, rating
you can even do more exception handling if desired.
Let me know if this works for you.
CodePudding user response:
How about importing the text file directly to a dataframe specifying field separators and header true/false and modify datatypes of columns with cast
.
Something like this:
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "::") \
.load("personalRatings.txt")
df1 = df1.select(df1.timestamp.cast("int"),df1.userId.cast("int"),df1.movieId.cast("int"),df1.rating.cast("float"))
df1.show(10)