Suppose I have the following data in a CSV format,
ID|TIMESTAMP_COL
1|03-02-2003 08:37:55.671 PM
2|2003-02-03 08:37:55.671 AM
and my code for reading the above CSV is,
from pyspark.sql.types import *
sch = StructType([StructField("ID",StringType(),False),StructField("TIMESTAMP_COL",StringType(),True)])
df = spark.read \
.format("csv") \
.option("encoding", "utf-8") \
.option("mode", "PERMISSIVE") \
.option("header", "true") \
.option("dateFormat", "dd-MM-yyyy") \
.option("timestampFormat", "dd-MM-yyyy HH:mm:ss.SSS a") \
.option("delimiter", "|") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.schema(sch) \
.load("data.csv")
So, according to the given timestamp format, I should get the record with id '2' rejected as it has a different format but it gets parsed but the value is different.
The output I am getting is,
df.show(truncate=False)
------------- ----------------------- -------------------
| ID| TIMESTAMP_COL| _corrupt_record|
------------- ----------------------- -------------------
| 1|2003-02-03 08:37:55.671| null|
| 2|0008-07-26 08:37:55.671| null|
------------- ----------------------- -------------------
Why is this happening?
CodePudding user response:
Not sure if it helps but here is what i found:
- In your schema second field is declared as StringType, shouldnt it be TimestampType()?
I was able to reproduce your results with spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") i did also tests with other possible options for this parameter
object LegacyBehaviorPolicy extends Enumeration {
val EXCEPTION, LEGACY, CORRECTED = Value
}
and here is doc for this parameter:
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing "
"dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. "
"When set to CORRECTED, classes from java.time.* packages are used for the same purpose. "
"The default value is EXCEPTION, RuntimeException is thrown when we will get different "
"results.")
So with LEGACY i am getting same results as you
With EXCEPTION Spark is throwing exception
org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
With CORRECTED Spark is returning nulls for both records
It is however parsing correctly record with id 1 when i change pattern to hh instead of HH
so with something like this:
from pyspark.sql.types import *
spark.conf.set("spark.sql.legacy.timeParserPolicy","CORRECTED")
sch = StructType([StructField("ID",StringType(),False),StructField("TIMESTAMP_COL",TimestampType(),True), StructField("_corrupt_record", StringType(),True)])
df = spark.read \
.format("csv") \
.option("encoding", "utf-8") \
.option("mode", "PERMISSIVE") \
.option("header", "true") \
.option("dateFormat", "dd-MM-yyyy") \
.option("timestampFormat", "dd-MM-yyyy hh:mm:ss.SSS a") \
.option("delimiter", "|") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.schema(sch) \
.load("dbfs:/FileStore/tables/stack.csv") \
df.show(truncate = False)
I am able to get this on output:
--- ----------------------- ----------------------------
|ID |TIMESTAMP_COL |_corrupt_record |
--- ----------------------- ----------------------------
|1 |2003-02-03 20:37:55.671|null |
|2 |null |2|2003-02-03 08:37:55.671 AM|
--- ----------------------- ----------------------------
I am getting null here because thats how Spark parser is working, when pattern is incorrect its assigning null and your value is not going to be moved to corrupted_records i think so if you want to remove not matching timestamps you may filter nulls
Edit: As mentioned in comment i was missing this column in schema, its added now and you can get corrupted_value if you need it