Spark - Write a DF with Struct Schema as CSV


I have a Spark DataFrame that I created by reading an EBCDIC data file. Its schema is as follows

scala> myDF.printSchema
 |-- DF_RECORD: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- BALANCE_AMT: decimal(15,4) (nullable = true)

I can even get it to show its rows. But when I try to write it as CSV, I get the following error

scala> myDF.write.format("csv").mode("overwrite").option("header", "true").save("/path")
org.apache.spark.sql.AnalysisException: CSV data source does not support struct<ID:string,BALANCE_AMT:decimal(15,4)> data type.;

Is there a column conversion I might have to de before writing?

Spark version 2.4.0-cdh6.2.1

Scala version 2.11.12

CodePudding user response:

Add a selectExpr before writing

scala> myDF.selectExpr("DF_RECORD.*").write.format("csv").mode("overwrite").option("header", "true").save("/path")

CodePudding user response:

Convert it to individual columns by accessing columns names in withColumn:

from pyspark.sql.types import StructType, StructField, StringType, Row, DoubleType
import pyspark.sql.functions as F

schema = StructType([
          StructField("ID", StringType()),
          StructField("BALANCE_AMT", DoubleType())

df = spark.createDataFrame([Row(Row("1",1000.0))], schema=schema)

 |-- DF_RECORD: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- BALANCE_AMT: double (nullable = true)

df_csv = df \
          .withColumn("ID", F.col("DF_RECORD")["ID"]) \
          .withColumn("BALANCE_AMT", F.col("DF_RECORD")["BALANCE_AMT"]) \

 --- ----------- 
 --- ----------- 
|  1|     1000.0|
 --- ----------- 

Now, export it as CSV:

df_csv.write.format("csv").mode("overwrite").option("header", "true").save("/content/sample_data/test1.csv")

!cat "/content/sample_data/test1.csv/part-00000-b342e07a-6d41-40b5-afa2-39eeef3b70a2-c000.csv"

1, 1000.0

