Home > Back-end >  Insert Spark dataframe to partitioned table
Insert Spark dataframe to partitioned table

Time:04-23

I have seen methods for inserting into Hive table, such as insertInto(table_name, overwrite =True, but I couldn't work out how to handle the scenario below.

For the first run, a dataframe like this needs to be saved in a table, partitioned by 'date_key'. There could be one or more partitions eg 202201 and 202203

 --- ---------- 
| id|  date_key|
 --- ---------- 
|  1|202201    |
|  2|202203    |
|  3|202201    |
 --- ---------- 

For subsequent run, the data comes in also like this, and I'd like to append the new data to their corresponding partitions using date_key

 --- ---------- 
| id|  date_key|
 --- ---------- 
|  4|202204    |
|  5|202203    |
|  6|202204    |
 --- ---------- 

Could you please help to shed some light on how to handle

  1. if during each run there will only be new data from one partition
  2. if during each run there will new data from multiple partitions, like the sample inputs above?

Many thanks for your help. Let me know if I can explain the problem better.

Edited: I could not use df.write.partitionBy("date_key").insertInto(table_name), as there was an error saying insertInto can not be used together with partitionBy.

CodePudding user response:

In my example here, first run will create new partitioned table data. c2 is the partition column.

df1 = spark.createDataFrame([
    (1, 'a'),
    (2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet

Second run, you don't need anything fancy, just append and insertInto. Spark knows you have c2 is the partition column and will it properly, you don't have to tell it via partitionBy,

df2 = spark.createDataFrame([
    (1, 'a'),
    (3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=c
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet

CodePudding user response:

if the table is an external table you can use the following code to write the data out to the external partitioned table

df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")

If it is a hive managed table then you can simply use the saveAsTable API as follows

df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")
  • Related