Home > OS >  Spark (Scala): Attribute name contains invalid character
Spark (Scala): Attribute name contains invalid character

Time:03-16

Trying to write DataFrame to HDFS, I faced with the following issue:

  org.apache.spark.sql.AnalysisException: Attribute name " "someName1"" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;

The source for DataFrame is a csv-file:

       "value_hash", "someName1"
       79000000000, name1
       79000000000, name2

This csv read by:

    val dataFrame = SparkSession.read.option("header", "true").csv(path) 

Then I select and cast types for this dataFrame:

   def castColumn(colName: String, colType: String): Column = col(colName).cast(DataType.fromJson(colType))

val featureColumns: Seq[(PathString, String)] = dataFrame.columns.tail.map(f=>(f, "\"string\"")).toSeq

val columns = (schema    featureColumns).
  map { case (colName, colType) => castColumn(colName, colType) }
dataFrame.select(columns.toSeq: _*)

where schema has type: Map[String, String] and has a default value Map("value_hash" -> ""string"")

In this code I prepend featureColumns-schema to default schema (as I didn't know full source-schema - it created dynamically after read csv from hdfs).

Then I'm trying to write this DataFrame in hdfs-path:

  dataFrame
      .repartition(1)
      .write
      .parquet(outputPath)

Following recommendation in error-message, I tried to use alias for each column so:

 filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca)))
}
 

and so:

filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca)).show()
}

But result was the same: org.apache.spark.sql.AnalysisException...

Notices the whitespace and quotation marks in " "someName1"", I also tried to clear alias from this:

 filteredRemove.columns.foreach{
  ca=>filteredRemove.select(col(ca).alias(ca.trim.substring(0,ca.length-1))).show()
}

But without any result. I still faces with the exception, which showed above.

What am I doing wrong?

CodePudding user response:

Set ignoreLeadingWhiteSpace to true, to eliminate the issue already during the load

Original code

val df_original = spark.read.option("header",true).csv(path)

println((for(c <- df_original.columns) yield s"`$c`").mkString(","))

`value_hash`,` "someName1"`

Ways for renaming the column after the load

val df_renamed_1 = df_original.withColumnRenamed(" \"someName1\"", "someName1")

println((for(c <- df_renamed_1.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`

val df_renamed_2 = df_original.withColumnRenamed(""" "someName1"""", "someName1")

println((for(c <- df_renamed_2.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`

A ways to avoid the issue during the load (include option("ignoreLeadingWhiteSpace",true))

val df_fixed = spark.read.option("header",true).option("ignoreLeadingWhiteSpace",true).csv(path)

println((for(c <- df_fixed.columns) yield s"`$c`").mkString(","))

`value_hash`,`someName1`
  • Related