Home > OS >  How to execute SQL select statements for multiple tables in AWS Lambda?
How to execute SQL select statements for multiple tables in AWS Lambda?

Time:07-06

I have 3 tables in Athena and I want to run 'SELECT * FROM table' for each of the 3 tables. Below is the basic python code which execute for one table named 'client'

import json
import boto3
import time

def lambda_handler(event, context):
    client = boto3.client('athena')
    
    #Setup and perform query
    queryStart = client.start_query_execution(
            QueryString = 'SELECT * FROM client limit 10;',
            QueryExecutionContext = {
                'Database': 'dev'
            },
            ResultConfiguration = {
                'OutputLocation': 's3://bucketpune/athena-output/lambda/'
            }
        )
        
    #Observe results
    queryId = queryStart['QueryExecutionId']
    time.sleep(15)
    
    results = client.get_query_results(QueryExecutionId = queryId)
    for row in results['ResultSet']['Rows']:
        print(row)

I want this to loop again with other tables one by one like 'SELECT * FROM customers limit 10;'

I used for loop as shown below but it takes only the last table name 'region'.

import json
import boto3
import time

table = ["client", "customer", "region"]
for x in table:
    def lambda_handler(event, context):
        client = boto3.client('athena')
        
        #Setup and perform query
        queryStart = client.start_query_execution(
                QueryString = 'SELECT * FROM '   x   ' limit 10;',
                QueryExecutionContext = {
                    'Database': 'dev'
                },
                ResultConfiguration = {
                    'OutputLocation': 's3://bucketpune/athena-output/lambda/'
                }
            )
            
        #Observe results
        queryId = queryStart['QueryExecutionId']
        time.sleep(15)
        
        results = client.get_query_results(QueryExecutionId = queryId)
        for row in results['ResultSet']['Rows']:
            print(row)

CodePudding user response:

Your for x in table: must be inside your handler, not outside:

import json
import boto3
import time

table = ["client", "customer", "region"]
client = boto3.client('athena')

def lambda_handler(event, context):
    
    for x in table:
        #Setup and perform query
        queryStart = client.start_query_execution(
                QueryString = 'SELECT * FROM '   x   ' limit 10;',
                QueryExecutionContext = {
                    'Database': 'dev'
                },
                ResultConfiguration = {
                    'OutputLocation': 's3://bucketpune/athena-output/lambda/'
                }
            )
            
        #Observe results
        queryId = queryStart['QueryExecutionId']
        time.sleep(15)
        
        results = client.get_query_results(QueryExecutionId = queryId)
        for row in results['ResultSet']['Rows']:
            print(row)

CodePudding user response:

What @Marcin suggested is not bad, although since Athena is asynchronous in some sense, you can benefit from that (AS LONG AS YOU DON'T NEED ANSWER FROM PREVIOUS QUERY!). The technical term for this is that Athena works in the best effort manner. It can take a second or half a minute to return your rows, so remember that time.sleep(15) will not ensure that Athena finished processing. Here's an example of a function in python that fetches data from Athena using also incremental wait (to not spam Athena each second and in the worst case scenario hit API qoutas):

def get_query_results(query):
    query_start = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': "{your_database}"
        },
        ResultConfiguration={
            'OutputLocation': "{s3_output_location}"
        }
    )

    query_id = query_start['QueryExecutionId']
    max_iterations = 10
    for i in range(max_iterations):
        print(f"{i} try")
        time.sleep((i   1) * 1)
        query_execution = client.get_query_execution(QueryExecutionId=query_id)
        status = query_execution['QueryExecution']['Status']['State']
        print(f"status = {status}")
        # 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED'
        if status == 'QUEUED' or status == 'RUNNING':
            continue
        if status == 'FAILED' or status == 'CANCELLED':
            raise 'Query failed'
        else:
            results = client.get_query_results(QueryExecutionId=query_id)
            rows = []
            for row in results['ResultSet']['Rows']:
                rows.append(row)
            return rows

    print("Could not retrieve in specified time")
    return None

Now, you can use that as a base to create a function that will first schedule all three (or n) queries and check for the answers periodically.

  • Related