Home > front end >  Read specific files and merge/union these schema evolutionized files into single Spark dataframe
Read specific files and merge/union these schema evolutionized files into single Spark dataframe

Time:02-08

I'm new to Spark, and I'm trying to achieve the below problem.
Scenario
I am trying to read multiple parquet files (and csv files as well, if possible later on) and load them into single spark dataframe in Python for specific range of dates, I'll explain the condition for selecting the dates later.
Reason: Schema evolution - new columns added in the recent/latest partition, so Union is not possible or I'm unaware. If there is a way to do Union in efficient way, please let me know about that as well.
Files look like this:

s3://dir1/dir2/dir3/files/partition_date=2020-12-25/
# Partitions do not exist for weekend days, i.e., Saturday and Sunday
s3://dir1/dir2/dir3/files/partition_date=2020-12-28/
s3://dir1/dir2/dir3/files/partition_date=2020-12-29/
s3://dir1/dir2/dir3/files/partition_date=2020-12-30/  # Consider this file with new columns in it
s3://dir1/dir2/dir3/files/partition_date=2020-12-31/  # Consider this file with new columns in it

Parquet (and csv, for different folder) resides in each of these files look like this:

s3://dir1/dir2/dir3/files/partition_date=2020-12-31/data_2020-12-31.parquet

Before schema changed, I used to load everything (all of the partitions) that existed in folder s3://dir1/dir2/dir3/files and then inserting the data into single Spark dataframe using below:

spark_df = spark.read.format('parquet').load('s3://dir1/dir2/dir3/files')

But now, I want to pull files just from specific dates, as specific range of dates won't work because of missing partitions. So I created the list using the for loop to just check which partitions exist. This list contains strings of all the dates for which partition exists.

dates = ['2020-12-25','2020-12-26','2020-12-27','2020-12-28','2020-12-29','2020-12-30','2020-12-31'] 
# I'll retrieve these dates by other efficient ways later on
existing_dates = []
# 'for' loop implementation
existing_dates = ['2020-12-25','2020-12-28','2020-12-29','2020-12-30','2020-12-31']

So here's the task for me:

  1. I want to pull data just for dates that exist in existing_dates
  2. I also need to merge partitions with new schema evolutionized partitions that contains new additional columns(namely: 2020-12-30 and 2020-12-31 in this example)
  3. I need to check whether the partition-parquet-file is empty or not, too! I came across this answer, but I don't know equivalent code in Pyspark.

CodePudding user response:

  1. You can use {} syntax to read specific partitions.
base_path = 's3://dir1/dir2/dir3/files'

# Note 1: Extra {{ is to add literal {.
# Note 2: Reading by partitions removes the partition column (partition_date) in returned dataframe by default.
#         To keep the partition_date column, add basePath option to set your parquet data path.
df = (spark.read
      .option('basePath', base_path)
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')

# f'{base_path}/partition_date={{{",".join(existing_dates)}}}' = 
# s3://dir1/dir2/dir3/files/partition_date={2020-12-25,2020-12-28,...}

FYI, other syntax [] can do range capturing.

s3://dir1/dir2/dir3/files/partition_date=2020-12-2[5-8]

will capture partitions for 2020-12-25, 2020-12-26, 2020-12-27, 2020-12-28.

  1. When you read partitions that missing columns and other partitions with additional columns, use mergeSchema option to align all columns.
df = (spark.read
      .option('basePath', base_path)
      .option('mergeSchema', 'true') # this should handle missing columns
      .parquet(f'{base_path}/partition_date={{{",".join(existing_dates)}}}')
  1. I am not sure what is the purpose of this yet. What would you like to do after identifying it? I am asking because depends on the task, you may not need to identify the empty parquet or not.
  •  Tags:  
  • Related