Home > Blockchain >  Pyspark .collect() error - IndexError: list index out of range
Pyspark .collect() error - IndexError: list index out of range

Time:10-08

I'm getting this error

line 23, in parseRating
    IndexError: list index out of range 

...upon any attempt at .collect(), .count() etc. So final line df3.collect() throws that error, but all the .show()'s work. I don't think it's a problem with the data, but I could be wrong.

New to this, really not sure what's going on. Any help greatly appreciated.

import os
from os import remove, removedirs
from os.path import join, isfile, dirname
from pyspark.sql.functions import col, explode 

import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

from pyspark.sql import SparkSession


def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    
    return int(fields[3]), int(fields[0]), int(fields[1]), float(fields[2])
    #return int(fields[0]), int(fields[1]), float(fields[2])

if __name__ == "__main__":

    # set up environment
    spark = SparkSession.builder \
   .master("local") \
   .appName("Movie Recommendation Engine") \
   .config("spark.driver.memory", "16g") \
   .getOrCreate() \
   
    
   
    sc = spark.sparkContext

    # load personal ratings
    #myRatings = loadRatings(os.path.abspath('personalRatings.txt'))
    
    
myRatingsRDD = sc.textFile("personalRatings.txt").map(parseRating)

ratings = sc.textFile("ratings.dat").map(parseRating)
 
    
df1 = spark.createDataFrame(myRatingsRDD,["timestamp","userID","movieID","rating"])
df1.show()

df2 = spark.createDataFrame(ratings,["timestamp","userID","movieID","rating"])
df2.show()

df3 = df1.union(df2)
df3.show()

df3.printSchema()

df3 = df3.\
    withColumn('userID', col('userID').cast('integer')).\
    withColumn('movieID', col('movieID').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp')
df3.show()
ratings = df3

df3.collect()

CodePudding user response:

One of the lines in your textfile may be malformed/incomplete and as a result the split("::") may not generate the number of expected fields. You may update your function to check the number of splits before trying to access the indexes. Eg.

def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    timestamp = int(fields[3]) if len(fields)>3 else None
    userId = int(fields[0]) if len(fields)>0 else None
    movieId = int(fields[1]) if len(fields)>1 else None
    rating = float(fields[2]) if len(fields)>2 else None

    return timestamp, userId, movieId, rating

you can even do more exception handling if desired.

Let me know if this works for you.

CodePudding user response:

How about importing the text file directly to a dataframe specifying field separators and header true/false and modify datatypes of columns with cast.

Something like this:

df1 = spark.read.format("csv") \
          .option("header", "true") \
          .option("delimiter", "::") \
          .load("personalRatings.txt")

df1 = df1.select(df1.timestamp.cast("int"),df1.userId.cast("int"),df1.movieId.cast("int"),df1.rating.cast("float"))

df1.show(10)
  • Related