Trying to write DataFrame to HDFS, I faced with the following issue:
org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
The source for DataFrame is a csv-file:
"value_hash", "someName1"
79000000000, name1
79000000000, name2
This csv read by:
val dataFrame = SparkSession.read.option("header", "true").csv(path)
Then I select and cast types for this dataFrame:
def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))
val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq
val columns = (schema featureColumns).
map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)
where schema has type: Map[String, String] and has a default value Map("value_hash" -> ""string"")
In this code I prepend featureColumns-schema to default schema (as I didn't know full source-schema - it created dynamically after read csv from hdfs).
Then I'm trying to write this DataFrame in hdfs-path:
dataFrame
.repartition(1)
.write
.parquet(outputPath)
Following recommendation in error-message, I tried to use alias for each column so:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)))
}
and so:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca)).show()
}
But result was the same: org.apache.spark.sql.AnalysisException...
Notices the whitespace and quotation marks in " "someName1"", I also tried to clear alias from this:
filteredRemove.columns.foreach{
ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}
But without any result. I still faces with the exception, which showed above.
What am I doing wrong?
CodePudding user response:
Set ignoreLeadingWhiteSpace to true, to eliminate the issue already during the load
Original code
val df_original = spark.read.option("header",true).csv(path)
println((for(c <- df_original.columns) yield s"`$c`").mkString(","))
`value_hash`,` "someName1"`
Ways for renaming the column after the load
val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")
println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")
println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`
A ways to avoid the issue during the load (include option("ignoreLeadingWhiteSpace",true)
)
val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)
println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))
`value_hash`,`someName1`