Home > database >  Spark CVS load - custom schema - custom object
Spark CVS load - custom schema - custom object

Time:02-01

Encoder<Transaction> encoder = Encoders.bean(Transaction.class);
Dataset<Row> transactionDS = sparkSession
                                    .read()
                                    .format("csv")
                                    .option("header", true)
                                    .option("delimiter", ",")
                                    .option("enforceSchema", false)
                                    .option("multiLine", false)
                                    .schema(encoder.schema())
                                    .load("s3a://xxx/testSchema.csv");
                                    .as(encoder);

    System.out.println("==============schema starts============");
    transactionDS.printSchema();
    System.out.println("==============schema ends============");

transactionDS.show(10, true); // this is the line that bombs.

My CVS is this -

transactionId,accountId
1,2
10,44

I'm printing my schema in the logs - (you see, the columns are now flipped, or sorted - Ah!)

==============schema starts============
root
 |-- accountId: integer (nullable = true)
 |-- transactionId: long (nullable = true)

==============schema ends============

I'm getting below error

Caused by: java.lang.IllegalArgumentException: CSV header does not conform to the schema.
           Header: transactionId, accounted
           Schema: accountId, transactionId
           Expected: accountId but found: transactionId

This is what my Tranaction class looks like.

public class Transaction implements Serializable {

private static final long serialVersionUID = 7648268336292069686L;

private Long transactionId;
private Integer accountId;

public Long getTransactionId() {
    return transactionId;
}

public void setTransactionId(Long transactionId) {
    this.transactionId = transactionId;
}

public Integer getAccountId() {
    return accountId;
}

public void setAccountId(Integer accountId) {
    this.accountId = accountId;
}
}

Question - Why Spark is not able to match my schema? The ordering is messed up. In my CSV, I'm passing transactionid, accountId but spark takes my schema accountId, transctionId. Ah!

CodePudding user response:

do not use encoder.schema to load csv file, its column order may not according to csv.

CodePudding user response:

Unlike parquet csv doesn't have a schema, so it will not apply the correct order, what you can do is to read the csv without:

.schema(encoder.schema())

Then apply the schema to the dataset that you just created.

  • Related