I have a Spark DataFrame that I created by reading an EBCDIC data file. Its schema is as follows
scala> myDF.printSchema
root
|-- DF_RECORD: struct (nullable = true)
| |-- ID: string (nullable = true)
| |-- BALANCE_AMT: decimal(15,4) (nullable = true)
I can even get it to show its rows. But when I try to write it as CSV, I get the following error
scala> myDF.write.format("csv").mode("overwrite").option("header", "true").save("/path")
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<ID:string,BALANCE_AMT:decimal(15,4)> data type.;
Is there a column conversion I might have to de before writing?
Spark version 2.4.0-cdh6.2.1
Scala version 2.11.12
CodePudding user response:
Add a selectExpr
before writing
scala> myDF.selectExpr("DF_RECORD.*").write.format("csv").mode("overwrite").option("header", "true").save("/path")
CodePudding user response:
Convert it to individual columns by accessing columns names in withColumn
:
from pyspark.sql.types import StructType, StructField, StringType, Row, DoubleType
import pyspark.sql.functions as F
schema = StructType([
StructField("DF_RECORD",
StructType([
StructField("ID", StringType()),
StructField("BALANCE_AMT", DoubleType())
])
)
])
df = spark.createDataFrame([Row(Row("1",1000.0))], schema=schema)
df.printSchema()
[Out]:
root
|-- DF_RECORD: struct (nullable = true)
| |-- ID: string (nullable = true)
| |-- BALANCE_AMT: double (nullable = true)
df_csv = df \
.withColumn("ID", F.col("DF_RECORD")["ID"]) \
.withColumn("BALANCE_AMT", F.col("DF_RECORD")["BALANCE_AMT"]) \
.drop("DF_RECORD")
[Out]:
--- -----------
| ID|BALANCE_AMT|
--- -----------
| 1| 1000.0|
--- -----------
Now, export it as CSV:
df_csv.write.format("csv").mode("overwrite").option("header", "true").save("/content/sample_data/test1.csv")
!cat "/content/sample_data/test1.csv/part-00000-b342e07a-6d41-40b5-afa2-39eeef3b70a2-c000.csv"
[Out]:
ID, BALANCE_AMT
1, 1000.0