I have an issue with spark 2.4.7. I run a job that reads table in database and generates parquet file. It should contain timestamp and some decimal parameters. By default in parquet file timestamp was generated as int96. Once I set spark.sql.parquet.writeLegacyFormat false
, Spark started to write timestamp as a number.
But for some reason setting spark.sql.parquet.writeLegacyFormat false
does not change Spark vehavior with decimals. But as per documentation it should work:
https://spark.apache.org/docs/latest/configuration.html spark-defaults.conf:
spark.sql.parquet.writeLegacyFormat false
spark.sql.parquet.outputTimestampType TIMESTAMP_MILLIS
spark.master yarn
Output:
timestamp = 1657498168000
...
rate = AAAAAAAAAAAAAAAAAiVRAA==
What can affect this parameter? I have tried to set it in spark-submit config, but same result - no change.
CodePudding user response:
If you look at version 2.4.7's way of reading in the spark.sql.parquet.writeLegacyFormat
in SQLConf.scala, you see that it gets read into the PARQUET_WRITE_LEGACY_FORMAT
value of the SQLConf
object with a default value that is false
.
You seem to be using the default value, so that would explain why you see no difference in behaviour. Try setting it to true
to see if you see desirable behaviour? Of course I'm not sure what you really want as output but I would expect some different behaviour.
Hope this helps!
EDIT: If you're interested in how and where these parameters affect the writing of parquet, the SparkToParquetSchemaConverter
class is interesting. Particularly lines 372 to 448. You'll see which config has impact on what type of value for timestamps and decimals.
CodePudding user response:
I read Koedlt's advice and tried to dig into Spark code as well. And I found the reason of this behaviour.
https://github.com/apache/spark/blob/928f518574ea755b5ae7ec028354450b7d1ca7f4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala , spark will use legacy format (FIXED_LEN_BYTE_ARRAY) if number of decimal digits is more than a limit (18 digits).
// ======================
// Decimals (legacy mode)
// ======================
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(Decimal.minBytesForPrecision(precision))
.named(field.name)
// ========================
// Decimals (standard mode)
// ========================
// Uses INT32 for 1 <= precision <= 9
case DecimalType.Fixed(precision, scale)
if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat =>
Types
.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.named(field.name)
// Uses INT64 for 1 <= precision <= 18
case DecimalType.Fixed(precision, scale)
if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat =>
Types
.primitive(INT64, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.named(field.name)
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(Decimal.minBytesForPrecision(precision))
.named(field.name)
So I just realized that I have numbers more than 18 digits. Here is example of an SQL that creates a table that spark job reads:
column1 Nullable( Decimal (38, 8)),
Once I changed it to Decimal (16, 8), I got what I wanted:
column1 = 1001850000000000
column2 = 954506506270500