When using wr.s3.to_parquet
I can construct a path with a Formatted string literal and have existing folders using the pattern.
def SaveInS3_test(Ticker, Granularity, Bucket, df, keyPrefix=""):
year, month, day = datetime.utcnow().strftime("%Y/%m/%d/%H").split("/")[0:3]
path = (
f"s3://{Bucket}/{keyPrefix}{year}/{month}/{day}/{Ticker}/{Granularity}.parquet"
)
print(path)
wr.s3.to_parquet(df, path, index=True, dataset=True, mode="append")
df=pd.DataFrame({'col': [1, 2, 3]})
SaveInS3_test("GBP", "H1","my_bucket", df, keyPrefix="Test/")
The path would then be something like this:
s3://my_bucket/Test/2022/08/06/GBP/H1.parquet
I would like to use the Athena/Glue database functionality of wrangler as follows (this works):
wr.s3.to_parquet(
df=df,
path=f's3://my_bucket',
dataset=True,
database='default', # Athena/Glue database
table='my_table') # Athena/Glue table
Can I use my F-string approach to path structure in some way with this database functionality?:
s3://my_bucket/Test/2022/08/06/GBP/H1.parquet
I'm not sure how I would use partitions or similar to do this.
Any attempts I make to use a path return an InvalidArgumentValue
as it does not match the existing Glue catalog table path.
CodePudding user response:
Solution
def save_in_s3(df, ticker, granularity, bucket, prefix):
dt = datetime.utcnow()
# Important! Create new partition cols
df = df.assign(**{
'ticker': ticker, 'granularity': granularity,
'year': dt.year, 'month': dt.month, 'day': dt.day
})
# Write to s3
wr.s3.to_parquet(
df,
path=f's3://{bucket}/{prefix}',
table='table_name',
database='database_name',
index=False,
dataset=True,
partition_cols=['year', 'month', 'day', 'ticker', 'granularity']
)
Worked out example
Let's assume we call the function on two different dates with different args:
# Called on 2022-08-06
save_in_s3(df, ticker='GBP', granularity='H1', bucket='my_bucket', prefix='my_folder')
# Called on 2022-08-07
save_in_s3(df, ticker='INR', granularity='H2', bucket='my_bucket', prefix='my_folder')
Now, assuming you already have the database created in the glue, the above function calls will output the following files in s3:
s3://my_bucket/my_folder/year=2022/month=8/day=6/ticker=GBP/granularity=H1/*.parquet
s3://my_bucket/my_folder/year=2022/month=8/day=7/ticker=INR/granularity=H2/*.parquet
The corresponding athena table will have the following partitions:
SHOW PARTITIONS "database_name"."table_name";
-- year=2022/month=8/day=6/ticker=GBP/granularity=H1
-- year=2022/month=8/day=7/ticker=INR/granularity=H2
Essentially the partitions and s3 folder structure will be automatically created by wrangler for you.