I have a S3 function that is defined as follows:
def get_unprocessed_s3_object_list(s3_client, bucket: str) -> List[str]:
paginator = s3_client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket)
all_files = {}
for page in page_iterator:
if page["KeyCount"] > 0:
for item in page["Contents"]:
all_files[item["Key"]] = item["LastModified"]
return sorted(all_files, key=lambda k: k[1])
Now, I was hoping to write a unit test for this and tried to use the stubber object from botocore to be able to test it. I tried something like:
def test_unprocessed_s3_objects():
client = boto3.client("s3")
stubber = Stubber(client)
list_objects_v2_response = {
"Contents": [
{
"Key": "sensor_1",
"LastModified": "2021-11-30T12:58:14 00:00",
"ETag": '"3a3c5ca43d2f01dba42314c1ca7e2237"',
"Size": 1417,
"StorageClass": "STANDARD",
},
{
"Key": "sensor_2",
"LastModified": "2021-11-30T12:58:05 00:00",
"ETag": '"02bc99343bbdcbdefc9fe691b6f7deaa"',
"Size": 1332,
"StorageClass": "STANDARD",
},
]
}
stubber.add_response("list_objects_v2", list_objects_v2_response)
items = get_unprocessed_s3_object_list(client, "test")
assert len(items) == 2
However, this comes back with Access Denied with ListObjectsV2 ooperation
. Maybe this has to do with the bucket ID. I thought this would bypass the actual call.
CodePudding user response:
After calling stubber.add_response("list_objects_v2", list_objects_v2_response)
, you need to call stubber.activate()
in order for the stubber to be used.