Home > Software design >  Force Spark to read CSV values as FloatType when the numbers are represented explicitly as floats
Force Spark to read CSV values as FloatType when the numbers are represented explicitly as floats

Time:08-24

I have csv file with hundreds of columns. So I represented values in the file explicitly to show as floats, but Spark infers as double. This is troubling as data size is huge and casting again on all columns is to be avoided. Though I could not find any clue upon searching, I was wondering if there is a solution to this problem. I am using Spark 3.3 and the issue is demonstrated below:

$ cat test.csv
Word    Wt1     Wt2
hello   1.0F    2.0F
hi      2.0F    4.0F

In spark-shell:

scala> val x = 2.0F
val x: Float = 2.0

scala> val df = sqlContext.read.format("csv").option("delimiter", "\t").option("header", "true").option("inferSchema", "true").csv("test.csv")
val df: org.apache.spark.sql.DataFrame = [Word: string, Wt1: double ... 1 more field]

scala> df.show()
 ----- --- --- 
| Word|Wt1|Wt2|
 ----- --- --- 
|hello|1.0|2.0|
|   hi|2.0|4.0|
 ----- --- --- 

scala> df.dtypes
val res6: Array[(String, String)] = Array((Word,StringType), (Wt1,DoubleType), (Wt2,DoubleType))

PS: The use case of mine has strings in the first 5 columns and the rest are all floats. However, the exact number of columns is not known apriori.

PS2: Upon inclusion of the suggested solution by @leleogere that worked perfectly in spark-shell, overload errors were found. The reason was that the two column lists were created as ListBuffers that needed to be converted to Array type.

CodePudding user response:

You can manually specify the schema to the CSV reader instead of letting it inferring it by itself:

import org.apache.spark.sql.types.{StructType, StructField, StringType, FloatType}

val numberOfStringColumns = 1  // 5 in your case

val columnsNames = spark.read.option("delimiter", "\t").option("header", "true").csv("test.csv").columns
// I don't think that the above live is a problem because
// no data is actually read as we don't call any action

val (stringCols, floatCols) = columnsNames.splitAt(numberOfStringColumns)
// the above line assumes that the string columns are the first ones of the dataframe

val schema = StructType(
  stringCols.map(c => StructField(c, StringType))   
  floatCols.map(c => StructField(c, FloatType))
)

val df = spark.read.format("csv").option("delimiter", "\t").option("header", "true").schema(schema).csv("test.csv")

df.show()
//  ----- --- --- 
// | Word|Wt1|Wt2|
//  ----- --- --- 
// |hello|1.0|2.0|
// |   hi|2.0|4.0|
//  ----- --- --- 

df.dtypes
// Array((Word,StringType), (Wt1,FloatType), (Wt2,FloatType))
  • Related