I want to generate a message for each object or key in my s3 bucket. I do have like ten keys. That is why I want to list all of them with list_objects_v2 and then pass them to SQS queue. Below, there is a sample of code I tried to use:
import json
import boto3
region = "us-east-2"
bucket = "s3-small-files-fiap"
prefix = 'folder/'
s3_client = boto3.client('s3', region_name=region)
response = s3_client.list_objects_v2(Bucket=bucket,
Prefix=prefix)
settings = {
"bucket_name": "s3-small-files-fiap",
"queue_name": "sqs-csv-to-json",
"region": region,
"account_number": <my_account_number>
}
bucket_notifications_configuration = {
'QueueConfigurations': [{
'Events': ['s3:ObjectCreated:*'],
'Id': 'Notifications',
'QueueArn':
'arn:aws:sqs:{region}:{account_number}:{queue_name}'.format(**settings)
}]
}
qpolicy = {
"Version": "2012-10-17",
"Id":
"arn:aws:sqs:{region}:{account_number}:{queue_name}/SQSDefaultPolicy".format(
**settings),
"Statement": [{
"Sid": "allow tmp bucket to notify",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:{region}:{account_number}:{queue_name}".format(
**settings),
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:{bucket_name}".format(
**settings)
}
}
}]
}
print("Bucket notify", bucket_notifications_configuration)
print("Queue Policy", qpolicy)
queue_attrs = {"Policy": json.dumps(qpolicy), }
sqs_client = boto3.resource("sqs",
region_name=region).get_queue_by_name(
QueueName=settings["queue_name"])
sqs_client.set_attributes(Attributes=queue_attrs)
sqs_client.attributes
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration=bucket_notifications_configuration)
For some reason, its output generates just one message of notification as follows. How can I send notifications ten times instead of one using the code above?
Here is the example of the output:
Bucket notify {'QueueConfigurations': [{'Events': ['s3:ObjectCreated:*'], 'Id': 'Notifications', 'QueueArn': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json'}]}
Queue Policy {'Version': '2012-10-17', 'Id': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json/SQSDefaultPolicy', 'Statement': [{'Sid': 'allow tmp bucket to notify', 'Effect': 'Allow', 'Principal': {'AWS': '*'}, 'Action': 'SQS:SendMessage', 'Resource': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json', 'Condition': {'ArnLike': {'aws:SourceArn': 'arn:aws:s3:*:*:s3-small-files-fiap'}}}]}
CodePudding user response:
Create a method and move your SNS send code
QUEUE_NAME = os.getenv("QUEUE_NAME")
SQS = boto3.client("sqs")
## Inside handler method
s3_resource = boto3.resource('s3')
response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)
for file in response:
# call SQS send method here
try:
#logger.debug("Recording %s", file)
u = getQueueURL()
logging.debug("Got queue URL %s", u)
resp = SQS.send_message(QueueUrl=u, MessageBody=file)
#logger.debug("Send result: %s", resp)
except Exception as e:
raise Exception("Raised Exception! %s" % e)
def getQueueURL():
"""Retrieve the URL for the configured queue name"""
q = SQS.get_queue_url(QueueName=QUEUE_NAME).get('QueueUrl')
#logger.debug("Queue URL is %s", QUEUE_URL)
return q
CodePudding user response:
I followed the suggested example and here is the code: import boto3 import logging import os
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
queue_name = 'https://sqs.us-east-2.amazonaws.com/<my account>/sqs-csv-to-json'
sqs_client = boto3.client('sqs', region_name='us-east-2')
s3_client = boto3.client('s3', region_name='us-east-2')
bucket = "s3-small-files-fiap"
prefix = 'folder/'
## Inside handler method
#
def listingObjects(bucket, prefix):
response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)
for file in response:
# call SQS send method here
try:
#logger.debug("Recording %s", file)
u = getQueueURL()
logging.debug("Got queue URL %s", u)
resp = sqs_client.send_message(QueueUrl=u, MessageBody=file)
#logger.debug("Send result: %s", resp)
except Exception as e:
raise Exception("Raised Exception! %s" % e)
else:
return resp
def getQueueURL():
"""Retrieve the URL for the configured queue name"""
q = sqs_client.get_queue_url(QueueName=queue_name).get('QueueUrl')
#logger.debug("Queue URL is %s", QUEUE_URL)
return q
but I'm facing such a kind of exception as output:
2022-09-11 13:33:06,565: INFO: Found credentials in shared credentials file: ~/.aws/credentials
Traceback (most recent call last):
File "/home/felipediego/Studies/FIAP/teste6.py", line 25, in listingObjects
u = getQueueURL()
File "/home/felipediego/Studies/FIAP/teste6.py", line 36, in getQueueURL
q = sqs_client.get_queue_url(QueueName=queue_name).get('QueueUrl')
File "/home/felipediego/.local/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/home/felipediego/.local/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.QueueDoesNotExist: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/felipediego/Studies/FIAP/teste6.py", line 41, in <module>
list_objects = listingObjects(bucket, prefix)
File "/home/felipediego/Studies/FIAP/teste6.py", line 30, in listingObjects
raise Exception("Raised Exception! %s" % e)
Exception: Raised Exception! An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.