Home > Software engineering >  How to override default timestamp format while reading csv in pyspark?
How to override default timestamp format while reading csv in pyspark?

Time:01-04

Suppose I have the following data in a CSV format,

ID|TIMESTAMP_COL
1|03-02-2003 08:37:55.671 PM
2|2003-02-03 08:37:55.671 AM

and my code for reading the above CSV is,

from pyspark.sql.types import *

sch = StructType([StructField("ID",StringType(),False),StructField("TIMESTAMP_COL",StringType(),True)])

df = spark.read \
            .format("csv") \
            .option("encoding", "utf-8") \
            .option("mode", "PERMISSIVE") \
            .option("header", "true") \
            .option("dateFormat", "dd-MM-yyyy") \
            .option("timestampFormat", "dd-MM-yyyy HH:mm:ss.SSS a") \
            .option("delimiter", "|") \
            .option("columnNameOfCorruptRecord", "_corrupt_record") \
            .schema(sch) \
            .load("data.csv")

So, according to the given timestamp format, I should get the record with id '2' rejected as it has a different format but it gets parsed but the value is different.

The output I am getting is,

df.show(truncate=False)

 ------------- ----------------------- ------------------- 
|           ID|          TIMESTAMP_COL|    _corrupt_record|
 ------------- ----------------------- ------------------- 
|            1|2003-02-03 08:37:55.671|               null|
|            2|0008-07-26 08:37:55.671|               null|
 ------------- ----------------------- ------------------- 

Why is this happening?

CodePudding user response:

Not sure if it helps but here is what i found:

  1. In your schema second field is declared as StringType, shouldnt it be TimestampType()?

I was able to reproduce your results with spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") i did also tests with other possible options for this parameter

 object LegacyBehaviorPolicy extends Enumeration {
    val EXCEPTION, LEGACY, CORRECTED = Value
  }

and here is doc for this parameter:

 .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing "  
      "dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. "  
      "When set to CORRECTED, classes from java.time.* packages are used for the same purpose. "  
      "The default value is EXCEPTION, RuntimeException is thrown when we will get different "  
      "results.")

So with LEGACY i am getting same results as you

With EXCEPTION Spark is throwing exception

 org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:

With CORRECTED Spark is returning nulls for both records

It is however parsing correctly record with id 1 when i change pattern to hh instead of HH

so with something like this:

from pyspark.sql.types import *

spark.conf.set("spark.sql.legacy.timeParserPolicy","CORRECTED")

sch = StructType([StructField("ID",StringType(),False),StructField("TIMESTAMP_COL",TimestampType(),True), StructField("_corrupt_record", StringType(),True)])

df = spark.read \
            .format("csv") \
            .option("encoding", "utf-8") \
            .option("mode", "PERMISSIVE") \
            .option("header", "true") \
            .option("dateFormat", "dd-MM-yyyy") \
            .option("timestampFormat", "dd-MM-yyyy hh:mm:ss.SSS a") \
            .option("delimiter", "|") \
            .option("columnNameOfCorruptRecord", "_corrupt_record") \
            .schema(sch) \
            .load("dbfs:/FileStore/tables/stack.csv") \
        
df.show(truncate = False)

I am able to get this on output:

 --- ----------------------- ---------------------------- 
|ID |TIMESTAMP_COL          |_corrupt_record             |
 --- ----------------------- ---------------------------- 
|1  |2003-02-03 20:37:55.671|null                        |
|2  |null                   |2|2003-02-03 08:37:55.671 AM|
 --- ----------------------- ---------------------------- 

I am getting null here because thats how Spark parser is working, when pattern is incorrect its assigning null and your value is not going to be moved to corrupted_records i think so if you want to remove not matching timestamps you may filter nulls

Edit: As mentioned in comment i was missing this column in schema, its added now and you can get corrupted_value if you need it

  • Related