Home > database >  AWS Data Wrangler s3.to_parquet replicate current S3 path structure
AWS Data Wrangler s3.to_parquet replicate current S3 path structure

Time:08-08

When using wr.s3.to_parquet I can construct a path with a Formatted string literal and have existing folders using the pattern.

def SaveInS3_test(Ticker, Granularity, Bucket, df, keyPrefix=""):

    year, month, day = datetime.utcnow().strftime("%Y/%m/%d/%H").split("/")[0:3]

    path = (
        f"s3://{Bucket}/{keyPrefix}{year}/{month}/{day}/{Ticker}/{Granularity}.parquet"
    )
    
    print(path)

    wr.s3.to_parquet(df, path, index=True, dataset=True, mode="append")


df=pd.DataFrame({'col': [1, 2, 3]})
SaveInS3_test("GBP", "H1","my_bucket", df, keyPrefix="Test/")

The path would then be something like this:

s3://my_bucket/Test/2022/08/06/GBP/H1.parquet

I would like to use the Athena/Glue database functionality of wrangler as follows (this works):

wr.s3.to_parquet(
    df=df,
    path=f's3://my_bucket',
    dataset=True,
    database='default',  # Athena/Glue database
    table='my_table')  # Athena/Glue table

Can I use my F-string approach to path structure in some way with this database functionality?:

s3://my_bucket/Test/2022/08/06/GBP/H1.parquet

I'm not sure how I would use partitions or similar to do this.

Any attempts I make to use a path return an InvalidArgumentValue as it does not match the existing Glue catalog table path.

CodePudding user response:

Solution

def save_in_s3(df, ticker, granularity, bucket, prefix):
    dt = datetime.utcnow()

    # Important! Create new partition cols
    df = df.assign(**{
        'ticker': ticker, 'granularity': granularity,
        'year': dt.year, 'month': dt.month, 'day': dt.day
    })

    # Write to s3
    wr.s3.to_parquet(
        df,
        path=f's3://{bucket}/{prefix}',
        table='table_name',
        database='database_name',
        index=False,
        dataset=True,
        partition_cols=['year', 'month', 'day', 'ticker', 'granularity']
    )

Worked out example

Let's assume we call the function on two different dates with different args:

# Called on 2022-08-06
save_in_s3(df, ticker='GBP', granularity='H1', bucket='my_bucket', prefix='my_folder')

# Called on 2022-08-07
save_in_s3(df, ticker='INR', granularity='H2', bucket='my_bucket', prefix='my_folder')

Now, assuming you already have the database created in the glue, the above function calls will output the following files in s3:

s3://my_bucket/my_folder/year=2022/month=8/day=6/ticker=GBP/granularity=H1/*.parquet
s3://my_bucket/my_folder/year=2022/month=8/day=7/ticker=INR/granularity=H2/*.parquet

The corresponding athena table will have the following partitions:

SHOW PARTITIONS "database_name"."table_name";

-- year=2022/month=8/day=6/ticker=GBP/granularity=H1
-- year=2022/month=8/day=7/ticker=INR/granularity=H2

Essentially the partitions and s3 folder structure will be automatically created by wrangler for you.

  • Related