I have to frequently write Dataframes as Hive tables.
df.write.mode('overwrite').format('hive').saveAsTable(f'db.{file_nm}_PT')
Or use Spark SQL or Hive SQL to copy one table to another as backup.
INSERT OVERWRITE TABLE db.tbl_bkp PARTITION (op_cd, rpt_dt)
SELECT * FROM db.tbl;
Problem is: Writing to hive_saging_directory takes 25% of the total time, whereas 75% or more time goes in moving the ORC files from staging directory to the final partitioned directory structure.
21/11/13 00:51:25 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-10-24 with partSpec {rpt_dt=2019-10-24}
21/11/13 00:51:56 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-02-18/part-00058-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2018-02-18/part-00058-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:51:56 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-02-18 with partSpec {rpt_dt=2018-02-18}
21/11/13 00:52:31 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-01-29/part-00046-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2019-01-29/part-00046-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:52:31 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-01-29 with partSpec {rpt_dt=2019-01-29}
21/11/13 00:53:09 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2020-08-01/part-00020-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2020-08-01/part-00020-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:53:09 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2020-08-01 with partSpec {rpt_dt=2020-08-01}
21/11/13 00:53:46 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2021-07-12/part-00026-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2021-07-12/part-00026-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:53:46 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2021-07-12 with partSpec {rpt_dt=2021-07-12}
21/11/13 00:54:17 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2022-01-21/part-00062-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2022-01-21/part-00062-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:54:17 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2022-01-21 with partSpec {rpt_dt=2022-01-21}
21/11/13 00:54:49 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-01-20/part-00063-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2018-01-20/part-00063-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
21/11/13 00:54:49 INFO hive.ql.metadata.Hive: New loading path = gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2018-01-20 with partSpec {rpt_dt=2018-01-20}
21/11/13 00:55:22 INFO hive.ql.metadata.Hive: Replacing src:gs://sam_tables/teradata/tbl_bkp/.hive-staging_hive_2021-11-12_23-26-38_441_6664318328991520567-1/-ext-10000/rpt_dt=2019-09-01/part-00037-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, dest: gs://sam_tables/teradata/tbl_bkp/rpt_dt=2019-09-01/part-00037-95ee77f0-4e27-4765-a454-e5009c4f33f3.c000, Status:true
This operation is quite fast on actual HDFS but on Google cloud blob this rename is actually copy pasting blobs and is very slow.
I have heard about direct path writes, can you all please suggest how to do that?
CodePudding user response:
(not so) Short answer
It's ... complicated. Very complicated. I wanted to write a short answer but I'd risk being misleading on several points. Instead I'll try to give a very short summary of the very long answer.
- Hive uses staging directories for a good reason: atomicity. You don't want users reading a table while it is being re-written, so instead you write in a staging directory and rename the directory when it's done, like this.
- Problem is: Cloud storages are "object storages", not "distributed files systems" like HDFS, and some operations like folder renaming can be much slower because of that.
- Each cloud has it's own storage implementation, with it's own specificities and downsides, and with time they even propose new variants to overcome some of these downsides (e.g. Azure has 3 different storage variants: Blob Storage, Datalake Storage Gen 1 and Gen 2).
- Therefore, the best solution on one cloud isn't necessarily the best on another cloud.
- The FileSystem API implementation for various cloud storage is part of the Hadoop distribution, which Spark uses. So the solutions available to you will also depend on which version of Hadoop your Spark installation is using.
- Azure/GCS only: You could try setting [this option]:(https://spark.apache.org/docs/3.1.1/cloud-integration.html#configuring):
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
. It is faster thanv1
but it also not recommended as it is not atomic and therefore less safe in case of partial failures. v2
is currently the default in Hadoop, but Spark 3 set it back tov1
by default and there are some discussion in the Hadoop community to deprecate it and makev1
the default again.- There are also some ongoing development to write better output committers for Azure and GCS, based on a similar output committer done for S3.
- Alternatively, you can try switching to cloud-first formats like Apache Iceberg, Apache Hudi or Delta Lake.
- I am not very familiar with these yet, but a quick look at Delta Lake's documentation convinced me that they had to deal with same kind of issues (cloud storages not being real file systems), and depending on which cloud you're on, it may require extra configuration, especially on GCP where the feature is flagged as experimental.
- I am not very familiar with Apache Iceberg nor Apache Hudi either, and I couldn't find any mention of them dealing with these kind of issues. It's unclear if it is because their core mechanics lets them avoid those issues or simply because nobody raised the question yet. I'd have to dig further into their design architecture to know for sure.
Now, for the long answer, maybe I should write a blog article... I'll post it here whenever it's done.