I have 3 tables in Athena and I want to run 'SELECT * FROM table' for each of the 3 tables. Below is the basic python code which execute for one table named 'client'
import json
import boto3
import time
def lambda_handler(event, context):
client = boto3.client('athena')
#Setup and perform query
queryStart = client.start_query_execution(
QueryString = 'SELECT * FROM client limit 10;',
QueryExecutionContext = {
'Database': 'dev'
},
ResultConfiguration = {
'OutputLocation': 's3://bucketpune/athena-output/lambda/'
}
)
#Observe results
queryId = queryStart['QueryExecutionId']
time.sleep(15)
results = client.get_query_results(QueryExecutionId = queryId)
for row in results['ResultSet']['Rows']:
print(row)
I want this to loop again with other tables one by one like 'SELECT * FROM customers limit 10;'
I used for loop as shown below but it takes only the last table name 'region'.
import json
import boto3
import time
table = ["client", "customer", "region"]
for x in table:
def lambda_handler(event, context):
client = boto3.client('athena')
#Setup and perform query
queryStart = client.start_query_execution(
QueryString = 'SELECT * FROM ' x ' limit 10;',
QueryExecutionContext = {
'Database': 'dev'
},
ResultConfiguration = {
'OutputLocation': 's3://bucketpune/athena-output/lambda/'
}
)
#Observe results
queryId = queryStart['QueryExecutionId']
time.sleep(15)
results = client.get_query_results(QueryExecutionId = queryId)
for row in results['ResultSet']['Rows']:
print(row)
CodePudding user response:
Your for x in table:
must be inside your handler, not outside:
import json
import boto3
import time
table = ["client", "customer", "region"]
client = boto3.client('athena')
def lambda_handler(event, context):
for x in table:
#Setup and perform query
queryStart = client.start_query_execution(
QueryString = 'SELECT * FROM ' x ' limit 10;',
QueryExecutionContext = {
'Database': 'dev'
},
ResultConfiguration = {
'OutputLocation': 's3://bucketpune/athena-output/lambda/'
}
)
#Observe results
queryId = queryStart['QueryExecutionId']
time.sleep(15)
results = client.get_query_results(QueryExecutionId = queryId)
for row in results['ResultSet']['Rows']:
print(row)
CodePudding user response:
What @Marcin suggested is not bad, although since Athena is asynchronous in some sense, you can benefit from that (AS LONG AS YOU DON'T NEED ANSWER FROM PREVIOUS QUERY!). The technical term for this is that Athena works in the best effort manner. It can take a second or half a minute to return your rows, so remember that time.sleep(15) will not ensure that Athena finished processing. Here's an example of a function in python that fetches data from Athena using also incremental wait (to not spam Athena each second and in the worst case scenario hit API qoutas):
def get_query_results(query):
query_start = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': "{your_database}"
},
ResultConfiguration={
'OutputLocation': "{s3_output_location}"
}
)
query_id = query_start['QueryExecutionId']
max_iterations = 10
for i in range(max_iterations):
print(f"{i} try")
time.sleep((i 1) * 1)
query_execution = client.get_query_execution(QueryExecutionId=query_id)
status = query_execution['QueryExecution']['Status']['State']
print(f"status = {status}")
# 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED'
if status == 'QUEUED' or status == 'RUNNING':
continue
if status == 'FAILED' or status == 'CANCELLED':
raise 'Query failed'
else:
results = client.get_query_results(QueryExecutionId=query_id)
rows = []
for row in results['ResultSet']['Rows']:
rows.append(row)
return rows
print("Could not retrieve in specified time")
return None
Now, you can use that as a base to create a function that will first schedule all three (or n) queries and check for the answers periodically.