I have seen methods for inserting into Hive table, such as insertInto(table_name, overwrite =True
, but I couldn't work out how to handle the scenario below.
For the first run, a dataframe like this needs to be saved in a table, partitioned by 'date_key'. There could be one or more partitions eg 202201
and 202203
--- ----------
| id| date_key|
--- ----------
| 1|202201 |
| 2|202203 |
| 3|202201 |
--- ----------
For subsequent run, the data comes in also like this, and I'd like to append the new data to their corresponding partitions using date_key
--- ----------
| id| date_key|
--- ----------
| 4|202204 |
| 5|202203 |
| 6|202204 |
--- ----------
Could you please help to shed some light on how to handle
- if during each run there will only be new data from one partition
- if during each run there will new data from multiple partitions, like the sample inputs above?
Many thanks for your help. Let me know if I can explain the problem better.
Edited:
I could not use df.write.partitionBy("date_key").insertInto(table_name)
, as there was an error saying insertInto
can not be used together with partitionBy
.
CodePudding user response:
In my example here, first run will create new partitioned table data
. c2
is the partition column.
df1 = spark.createDataFrame([
(1, 'a'),
(2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
Second run, you don't need anything fancy, just append
and insertInto
. Spark knows you have c2
is the partition column and will it properly, you don't have to tell it via partitionBy
,
df2 = spark.createDataFrame([
(1, 'a'),
(3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=c
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
CodePudding user response:
if the table is an external table you can use the following code to write the data out to the external partitioned table
df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")
If it is a hive managed table then you can simply use the saveAsTable
API
as follows
df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")