Home > Blockchain >  Reading multiple csv files with different numbers of columns into a single spark dataframe in databr
Reading multiple csv files with different numbers of columns into a single spark dataframe in databr

Time:07-08

I have a blob storage container in Azure and I want to load all of the .csv files in the container into a single spark dataframe. All the files have the same first 2 columns ('name', 'time'). I do some transformations on the time column to convert into a datetime field, and I also create a new id column based on the filename and move this so it is the first column. All remaining columns are consisting in naming format, however, some files have more additional columns than others. For example:

One file could be like this:

id name time LV01 LV02 LV03
abc name1 01/01/1900 01:00:00 47.96 23.10 43.00

Whereas the next file might have columns that go up to LV15, and another might have columns that go up to LV25 etc.

I am using the following code to load my data and initially it seems to be working:

from pyspark.sql.functions import *


file_location = 'dbfs:/mnt/<container>/<foldername>'
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df3 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .withColumn("id",substring(input_file_name(), 45, 3)) #create id column from filename   

#df3.select([df3.columns[-1]]   df3.columns[:-1]).show() #move id to first column
df3 = df3.select([df3.columns[-1]]   df3.columns[:-1]) #move id column to beginning
df3 = df3.withColumn("time", (col('time')/1000000000)) #convert nanoseconds into seconds as pyspark doesn't have nanoseconds
df3 = df3.withColumn("time",from_unixtime(col('time'))) #convert seconds into datetime values
df3.show() 

I've checked that the files have loaded into the dataframe by checking the id column and seeing if the ids have generated correctly (which they have). The issue is with the number of columns - when i display the dataframe, I'm getting inconsistent results. I only get columns up to CH14 and I know some files go up to CH25 etc. Am I missing something? I need all of the column to be present as next I need to perform the equivalent of the pandas melt operation (which I have done in Python as follows):

cols = df.iloc[:,3:-1:]
col_names = list(cols.columns.values)
col_names
df_long = df.melt(id_vars=['id','name', 'time'], var_name='channel',value_vars=col_names, value_name='value')
df_long.head()

I can't perform this step in databricks yet (or some equivalent) until I know the columns are being pulled through correctly. When I'm loading the files, does spark only load the columns that are consistent across all files?

CodePudding user response:

Latest pyspark will have the following feature:

df = df1.unionByName(df2, allowMissingColumns=True)

This should unite 2 DataFrame with different column. More details in the API doc

CodePudding user response:

I have figured out a solution to this. Instead of using the infer_schema option, I can simply set the schema manually so that it includes all of the columns across each of the files:

schema = StructType() \
.add("name",StringType(),True) \
.add("time",StringType(),True) \
.add("LV01",StringType(),True) \
.add("LV02",StringType(),True) \
.add("LV03",StringType(),True) \
.add("LV04",StringType(),True) \
.add("LV05",StringType(),True) \
.add("LV06",StringType(),True) \
.add("LV07",StringType(),True) \
#etc etc

Once this is done, you can just pass the schema option into the load code:

first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(schema) \
  .load(file_location)

This works great for my example, although for files with 100s of columns, there will probably be a more efficient method. Still, this works for what I need to do.

  • Related