Home > Blockchain >  How to merge part files in HDFS?
How to merge part files in HDFS?

Time:11-02

What I want

I have 17TB of date-partitioned data in the directory of this kind:

/data_folder
  /date=2021.01.01
    /part-00002-f0b91523-6e0c-4adc-88cc-e9451614791d.c000.snappy.parquet
    /part-00002-f0193442-c20e-49d2-bde1-70053ae2a254.c000.snappy.parquet
    /... over 9000 part files 
  /date=2021.01.02
    /part-00002-bdb50c33-fd32-4e87-9edb-cec77973760b.c000.snappy.parquet
    /part-00001-e2cd906e-5669-46d7-92e9-7498ed60487f.c000.snappy.parquet
    /... over 9000 part files 

I want to make it look like this:

/data_folder
  /date=2021.01.01
    /merge.parquet
  /date=2021.01.02
    /merge.parquet

I want this for the reason that I heard that HDFS is preferable to store a small number of large files, instead of a large number of small files. Now my queries have become very slow. Hope this optimization will speed them up

What I do

So I run the commands like this:

hdfs dfs -getmerge /data_folder/date=2021.01.01 merge.parquet;
hdfs dfs -copyFromLocal -f -t 4 merge.parquet /merged/date=2021.01.01/merge.parquet;

I got the directory structure I wanted, but now I can't read the files. The query:

%spark2.spark

val date = "2021.01.01"


val ofdCheques2Uniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
    .withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
    .dropDuplicates("chequeId")
    
val ofdChequesTempUniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")
    .withColumn("chequeId", concat($"content.cashboxRegNumber", lit("_"), $"content.number", lit("_"), col("content.timestamp")))
    .dropDuplicates("chequeId")

println(s"OfdCheques2   : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")

Prints:

OfdCheques2   : 4309 unique cheques
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 1720, srs-st-hdp-s3.dev.kontur.ru, executor 1): java.io.IOException: can not read class org.apache.parquet.format.PageHeader: don't know what type: 13

Meanwhile, such a query:

val ofdCheques2Uniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdCheques2/date=$date")
    
val ofdChequesTempUniq = spark.read
    .parquet(s"/projects/khajiit/data/OfdChequesTemp/date=$date")

println(s"OfdCheques2   : ${ofdCheques2Uniq.count} unique cheques")
println(s"OfdChequesTemp: ${ofdChequesTempUniq.count} unique cheques")

Prints:

OfdCheques2   : 5290 unique cheques
OfdChequesTemp: 18 unique cheques

Finally the questions

  1. Is the getmerge command applicable to my problem? If so, what did I do wrong?
  2. What is the best way to solve this problem?

CodePudding user response:

got the directory structure I wanted, but now I can't read the files

This is due to the binary structure of Parquet files. They have header/footer metadata that stores the schemas and the number of records in the file... getmerge therefore is really only useful for row-delimited, non-binary data formats.

What you can do instead is have spark.read.path("/data_folder"), then repartition or coalesce that dataframe, then output to a new "merged" output location

Another alternative is Gobbilin - https://gobblin.apache.org/docs/user-guide/Compaction/

  • Related