Home > Software design >  Storing CSV data into Dynamic paths using Athena Query (Start Query Execution)
Storing CSV data into Dynamic paths using Athena Query (Start Query Execution)

Time:10-13

I am trying to store JSON data that has been dumped into an input S3 bucket and convert the file into CSV in another S3 output bucket location using Athena's Start Query Execution.

  1. I am using a large Query that would be inserted into a temp table (using INSERT INTO).

  2. That table is partitioned into year, month, day and hour.

  3. Using AWS Glue I was able to set up storage.location.template for the query table (See screen scrape)

    s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}

  4. I am also using projection year, hour, month, and day using AWS Gue on this table. (See screen scrape)

This output patch is dynamically created based on the date and time when the event has fired. It will store CSV files from JSON that were created during that event time the Athena's query. The output path should look like the following screen scrape: enter image description here

I am using python lambda to extract the event record's eventDate value and then using an Athena query, output the csv files to the dynamic output path

Note: I have only been able to run this successfully using a static S3 path but not a dynamic S3 path which is a requirement .

When I ingest/extract an input JSON file in the input S3 bucklet, I get the following error when Athena runs the query using dynamic S3 path:

(<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).

Please help me determine what I am doing that is incorrect. Thanks much

Here is the table property enter image description here

Here is the lambda code:

def lambda_handler(event, context):
try:
    print('<< IN Handler >>')
    print(event)
    print("File size is", event['Records'][0]['s3']['object']['size'])
    if(event['Records'][0]['s3']['object']['size'] > 0):
        file_path = event['Records'][0]['s3']['object']['key']
        print('<<FILEPATH >>' file_path)
        bucket_name = os.environ['BUCKETBAJA']
        file_name = file_path.split("/")[-1]
        print(file_name)
        #process the  input file time stamp to extrac tthe year,month,day
        parse_filename = file_name.split(today_year)
        #Get date and hour
        parse_date = event['Records'][0]['eventTime']
        #print(parse_date)
        dateStr = parse_date.split("-")
        print('<< PROCESS DATE >>' dateStr[2])
        fyear= dateStr[0]
        fmonth= dateStr[1]
        fday =  dateStr[2].split("T")
        #print('REST OF STRING ' fday[1])
        parse_more = fday
        #print(parse_more)
        hour = fday[1].split(":")
        #print(hour)
        fhour = hour[0]
        fday = fday[0]
        print('<< DAY >>' fday)
        print('<< HOUR >>' fhour)
        print('<< MONTH >>' fmonth)
        process_date = fyear '-' fmonth '-' fday
        start_date = fyear '-' fmonth '-' fday
        end_date = fyear '-' fmonth '-' fday
        output_prefix= "all_dates/year=/" fyear "/month=" fmonth '/day=' fday '/hour=' fhour
        print('<< OUT_PREFIX  >>' output_prefix)
        clients3 = boto3.client('s3')
        result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
        exists=False
        athena_out_path = 's3://' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour 
        print('<<< INSIDE BAJA TABLE DUMP NEXT ...  >>>>')
        #list_tables = table_queries_v1
        if 'coldstone' in file_name:
            query = kahala_coldstone
            bucket_name = os.environ['BUCKETCOLDSTONE']
            athena_out_path = 's3://athena-' bucket_name '/all-dates/year=' fyear '/month=' fmonth '/day=' fday '/hour=' fhour 
        else:
            query = kahala_baja
            print('<<QUERY  >>')
            response = athena_client.start_query_execution( 
                QueryString=query,
                QueryExecutionContext={
                  'Database': database
                },
                ResultConfiguration={
                    'OutputLocation': athena_out_path,
                }
            )
       
    else:
        print("Empty File, exiting")
except:
    print("Unknown Error")
    print(sys.exc_info())

Here is the Query code:

    """insert into olo_baja_insert_to_csv
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number,
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
    cast(modifier_detail['description'] as varchar) modifier_detail_description,
    cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
    cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
    cast(modifier_detail['modifierquantity'] as varchar) 
modifier_detail_modifier_quantity,
    cast(modifier_detail['customfields'] as varchar)  modifier_detail_custom_fields,
    cast(modifier_quantity as varchar) modifier_quantity,
   cast(modifier_custom_fields as varchar) modifier_custom_fields,
    cast(delivery as varchar) delivery,
    cast(total as varchar) total,
    cast(subtotal as varchar) subtotal,
    cast(discount as varchar) discount,
    cast(tip as varchar) tip,
    cast(sales_tax as varchar) sales_tax,
    cast(customer_delivery as varchar) customer_delivery, 
    cast(payment_amount as varchar) payment_amount, 
    cast(payment_description as varchar) payment_description,
    cast(payment_type as varchar) payment_type,
    cast(location_lat as varchar) location_lat,
    cast(location_long as varchar) location_long,
    cast(location_name as varchar) location_name,
    cast(location_logo as varchar) location_logo,
    cast(ordering_provider_name as varchar) ordering_provider_name,
    cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
    from( 
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number, 
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(special_instructions as varchar) special_instructions,
    cast(quantity as varchar) quantity,
    cast(recipient_name as varchar) recipient_name,
    cast(custom_values as varchar) custom_values,
    cast(item_description as varchar) item_description,
    cast(item_selling_price as varchar) item_selling_price,
    cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
    cast(modifier['modifierid'] as varchar) modifier_id,
    cast(modifier['description'] as varchar) modifier_description,
    cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
    cast(modifier['modifiers'] as varchar) modifier_details,
    cast(modifier['modifierquantity'] as varchar) modifier_quantity,
    cast(modifier['customfields'] as varchar) modifier_custom_fields,
    cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar), 
    cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
    cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar), 
    cast(location_logo as varchar),
    cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
    from(
    select
    cast(json_extract(customer, '$.firstname') as varchar) as first_name,
    cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
    cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
    cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
    cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
    cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
    cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
    cast(json_extract(customer, '$.lastname') as varchar) as last_name,
    cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
    try_cast(json_extract("item", '$.productid') as varchar) product_id,
    try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
    try_cast(json_extract("item", '$.quantity') as varchar) quantity,
    try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
    try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
    try_cast(json_extract("item", '$.description') as varchar) item_description,
    try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
    cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
    cast(json_extract(totals, '$.delivery') as varchar) delivery,
    cast(json_extract(totals, '$.total') as varchar) total,
    cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
    cast(json_extract(totals, '$.discount') as varchar) discount,
    cast(json_extract(totals, '$.tip') as varchar) tip,
    cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
    cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
    payment['amount'] payment_amount,
    payment['description'] payment_description,
    payment['type'] payment_type,
    cast(json_extract("location", '$.latitude') as varchar) location_lat,
    cast(json_extract("location", '$.longitude') as varchar) location_long,
    cast(json_extract("location", '$.name') as varchar) location_name,
    cast(json_extract("location", '$.logo') as varchar) location_logo,
    cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
    cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
    from sandbox_twilliams.olo_baja_raw_john_testing
    cross join unnest (payments, "items") as t (payment, item)
    --cross join unnest ("items") as t (item)
    --cross join unnest (payments) as t (payment)
    where year = {fyear}
    and month = {fmonth}
    and day = {fday}
    and hour = {fhour}
    ).format(year={fyear},month={fmonth},day={fday}, hour={fhour})  
    CROSS JOIN UNNEST (modifiers) as t (modifier)
    )
    CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
    group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""      

CodePudding user response:

There is a syntax error in your query, but you don't include the query in the question so it's hard to figure out what's going wrong. It looks like you print the query SQL in your logging so I suggest taking that SQL and running it manually in the Athena console and see if you can figure out from the error message what's wrong.

On another note, converting to CSV is best done through UNLOAD. Athena's query results are CSV, but Athena also writes a binary metadata file along with the CSV file, which can mess things up if you expect the output directory to only contain CSV data.

CodePudding user response:

As I had commented to @Theo , It was an absolute mess attempting to get the insert into statement - on that query - functional. Some tweaks to the query had to be made and now the csv files are getting generated. The following tweaks were:

This format() functions tatement was omitted from the query just before assigning its value to the Athena client for the Start Execute Query : query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)

Another reason the query wasn't running was due to a {month} binding that should have been {fmonth} (fmonth is a variable used to store the month value from the event date time stamp) . The majority of the time was spent trying to get the insert to work, but it does not seem possible to do that insert from a JSON SerDe to a CSV SerDe. I wonder why this would be the case?

Other than this, the motto for this fix to start wit is "omit the INSERT INTO statement before the beginning of the query string".

  • Related