Home > Blockchain >  Listing all objects in a bucket and sending the notification from all of them to AWS SQS using boto3
Listing all objects in a bucket and sending the notification from all of them to AWS SQS using boto3

Time:09-12

I want to generate a message for each object or key in my s3 bucket. I do have like ten keys. That is why I want to list all of them with list_objects_v2 and then pass them to SQS queue. Below, there is a sample of code I tried to use:

import json
import boto3

region = "us-east-2"
bucket = "s3-small-files-fiap"
prefix = 'folder/'

s3_client = boto3.client('s3', region_name=region)

response = s3_client.list_objects_v2(Bucket=bucket,
                                     Prefix=prefix)

settings = {
    "bucket_name": "s3-small-files-fiap",
    "queue_name": "sqs-csv-to-json",
    "region": region,
    "account_number": <my_account_number>
}

bucket_notifications_configuration = {
    'QueueConfigurations': [{
        'Events': ['s3:ObjectCreated:*'],
        'Id': 'Notifications',
        'QueueArn':
        'arn:aws:sqs:{region}:{account_number}:{queue_name}'.format(**settings)
    }]
}

qpolicy = {
    "Version": "2012-10-17",
    "Id":
    "arn:aws:sqs:{region}:{account_number}:{queue_name}/SQSDefaultPolicy".format(
        **settings),
    "Statement": [{
        "Sid": "allow tmp bucket to notify",
        "Effect": "Allow",
        "Principal": {"AWS": "*"},
        "Action": "SQS:SendMessage",
        "Resource": "arn:aws:sqs:{region}:{account_number}:{queue_name}".format(
            **settings),
        "Condition": {
            "ArnLike": {
                "aws:SourceArn": "arn:aws:s3:*:*:{bucket_name}".format(
                    **settings)
            }
        }
    }]
}
print("Bucket notify", bucket_notifications_configuration)
print("Queue Policy", qpolicy)

queue_attrs = {"Policy": json.dumps(qpolicy), }

sqs_client = boto3.resource("sqs",
                   region_name=region).get_queue_by_name(
                       QueueName=settings["queue_name"])
sqs_client.set_attributes(Attributes=queue_attrs)
sqs_client.attributes

s3_client.put_bucket_notification_configuration(
    Bucket=bucket,
    NotificationConfiguration=bucket_notifications_configuration)

For some reason, its output generates just one message of notification as follows. How can I send notifications ten times instead of one using the code above?

Here is the example of the output:

Bucket notify {'QueueConfigurations': [{'Events': ['s3:ObjectCreated:*'], 'Id': 'Notifications', 'QueueArn': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json'}]}
Queue Policy {'Version': '2012-10-17', 'Id': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json/SQSDefaultPolicy', 'Statement': [{'Sid': 'allow tmp bucket to notify', 'Effect': 'Allow', 'Principal': {'AWS': '*'}, 'Action': 'SQS:SendMessage', 'Resource': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json', 'Condition': {'ArnLike': {'aws:SourceArn': 'arn:aws:s3:*:*:s3-small-files-fiap'}}}]}

CodePudding user response:

Create a method and move your SNS send code

            QUEUE_NAME = os.getenv("QUEUE_NAME")
        SQS = boto3.client("sqs")

        ## Inside handler method 

         s3_resource = boto3.resource('s3')
         response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)


         for file in response:
            # call SQS send method here 
            try:
                #logger.debug("Recording %s", file)
                u = getQueueURL()
                logging.debug("Got queue URL %s", u)
                resp = SQS.send_message(QueueUrl=u, MessageBody=file)
                #logger.debug("Send result: %s", resp)
            except Exception as e:
                raise Exception("Raised Exception! %s" % e)

        def getQueueURL():
            """Retrieve the URL for the configured queue name"""
            q = SQS.get_queue_url(QueueName=QUEUE_NAME).get('QueueUrl')
            #logger.debug("Queue URL is %s", QUEUE_URL)
            return q

CodePudding user response:

I followed the suggested example and here is the code: import boto3 import logging import os

logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

queue_name = 'https://sqs.us-east-2.amazonaws.com/<my account>/sqs-csv-to-json'
sqs_client = boto3.client('sqs', region_name='us-east-2')
s3_client  = boto3.client('s3', region_name='us-east-2')
bucket = "s3-small-files-fiap"
prefix = 'folder/'
## Inside handler method

# 
def listingObjects(bucket, prefix):
    response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)

    for file in response:
        # call SQS send method here 
        try:
            #logger.debug("Recording %s", file)
            u = getQueueURL()
            logging.debug("Got queue URL %s", u)
            resp = sqs_client.send_message(QueueUrl=u, MessageBody=file)
            #logger.debug("Send result: %s", resp)
        except Exception as e:
            raise Exception("Raised Exception! %s" % e)
        else:
            return resp

def getQueueURL():
    """Retrieve the URL for the configured queue name"""
    q = sqs_client.get_queue_url(QueueName=queue_name).get('QueueUrl')
    #logger.debug("Queue URL is %s", QUEUE_URL)
    return q

but I'm facing such a kind of exception as output:

2022-09-11 13:33:06,565: INFO: Found credentials in shared credentials file: ~/.aws/credentials
Traceback (most recent call last):
  File "/home/felipediego/Studies/FIAP/teste6.py", line 25, in listingObjects
    u = getQueueURL()
  File "/home/felipediego/Studies/FIAP/teste6.py", line 36, in getQueueURL
    q = sqs_client.get_queue_url(QueueName=queue_name).get('QueueUrl')
  File "/home/felipediego/.local/lib/python3.8/site-packages/botocore/client.py", line 508, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/felipediego/.local/lib/python3.8/site-packages/botocore/client.py", line 915, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.QueueDoesNotExist: An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/felipediego/Studies/FIAP/teste6.py", line 41, in <module>
    list_objects = listingObjects(bucket, prefix)
  File "/home/felipediego/Studies/FIAP/teste6.py", line 30, in listingObjects
    raise Exception("Raised Exception! %s" % e)
Exception: Raised Exception! An error occurred (AWS.SimpleQueueService.NonExistentQueue) when calling the GetQueueUrl operation: The specified queue does not exist for this wsdl version.
  • Related