this lambda is triggered by sqs and fetches messages from sqs and updates dynamodb table. 2 policies are attached to the lambda - to be called by sqs(get queue) and to put_item into dynamodb table.
import boto3
import json
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
queue = boto3.resource('sqs', region_name='us-east-1').get_queue_by_name(QueueName="erjan")
table = boto3.resource('dynamodb', region_name='us-east-1').Table('Votes')
def process_message(message):
logging.info('----------process_message----------------------')
logging.info('-------------SQS auto genereated msg------------------------')
logging.info(type(message))
try:
logging.info('----------process_message----------------------')
payload = message.message_attributes
voter = payload['voter']['StringValue']
vote = payload['vote']['StringValue']
logging.info("Voter: %s, Vote: %s", voter, vote)
update_count(vote)
message.delete()
except Exception as e:
print('-----EXCEPTION-----')
def update_count(vote):
logging.info('update count....')
cur_count = 0
if vote == 'b':
logging.info('vote is b - update...')
response = table.get_item(Key = {'voter':'count'})
item = response['Item']
item['b'] =1
table.put_item(Item = item)
elif vote == 'a':
logging.info('vote is a - update...')
table.update_item(
Key={'voter':'count'},
UpdateExpression="ADD a :incr",
ExpressionAttributeValues={':incr': 1})
def lambda_handler(event,context):
logging.info('--------inside main-------')
try:
logging.info('--------------------------------------')
logging.info(event)
logging.info('------------------------inside try - queue.receive_messages-------------')
messages = queue.receive_messages(MessageAttributeNames=['vote','voter'])
logging.info(messages)
logging.info('--------------------------------------')
for message in messages:
logging.info('----------every msg -------------')
print('----------every msg -------------')
process_message(message)
return {'statusCode': 200, 'body': '{"status": "success"}'}
except Exception as e:
logging.error(e)
return {'statusCode': 500, 'body': '{"status": "error"}'}
the test event:
{
"Records": [
{
"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
"receiptHandle": "MessageReceiptHandle",
"body": "",
"attributes": {
"ApproximateReceiveCount": "1"
},
"messageAttributes": {
"vote": {
"Type": "String",
"StringValue": "b"
},
"voter": {
"Type": "String",
"StringValue": "count"
}
},
"awsRegion": "us-east-1"
}
]
}
running test event only returns result 200 success. But it does not show the logs and prints from lambda_handler() function at all.
I checked cloudwatch logs and same output - just 3 lines "start request_id, end request_id, report". The lambda actually only checks sqs queue(it does exist) and event context are not used.
But it does not print even the basic logging.info('--------inside main-------') or other logs under try:
logging.info('--------------------------------------')
logging.info(event)
logging.info('------------------------inside try - queue.receive_messages-------------')
CodePudding user response:
for some reason just adding sqs trigger in aws console did not work. i had to add resource based policy statement in aws configuration.
then you can send test msgs(events) from sqs console - "send and receive msg" button.
under triggers should be 2 things- sqs and lambda.
CodePudding user response:
The payload object is calling the incorrect object.
payload = message.message_attributes
It should be:
payload = record['messageAttributes']
That one thing, there is a few points as well. Like you need to parse json object to able to call use json properties. Probably you have single event at the moment when sqs events scale you might need handle them with:
for record in event['Records']:
payload = loads(record['body'], parse_float=str)
voter = record['messageAttributes']['Voter']['stringValue']
I found this blog useful for your case especially examples given in python, step2 https://hevodata.com/learn/sqs-to-dynamodb/