I am still new to working with AWS and s3 so I appreciate any insight. I am trying to
retrieve the most recent file within multiple subfolders in a single s3 bucket. There over > 1000 folders within this single s3 bucket, so I am using
boto3.resource
Once I have identified the last file each subfolder I want to open the file (.json) and parse it into a sqlite3 database. But am unsure how to get from s3 to sqlite3 or where to start.
Here is the code I have so far:
import os
import boto3
import sqlite3
import json
from datetime import datetime
# to get the folder name and file name and split
def main_request():
s3 = boto3.resource('s3')
bucket = s3.Bucket('bucket_name')
for obj in bucket.objects.all():
#print(obj.key)
#key = obj.key
path, filename = os.path.split(obj.key)
return path, filename
# attempting to get the last modified file
def last_updated_file(response):
bucket_name = 'bucket_name'
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
x = None
for x in bucket.objects.filter(Prefix=response[0]):
result = x.last_modified
print("Result is: " str(response), result)
last_modified_file = x.last_modified
return last_modified_file
last_updated_file(main_request())
However when I printed the results to check my work, it is incorrect and shows as below. It is pulling the latest file correct but the last modified dates are incorrect, there are duplicates, and it isn't pulling any other folders.
Result is: ('path1', '080922_0.json') 2022-06-02 05:28:25 00:00
Result is: ('path1', '080922_0.json') 2022-06-03 05:26:54 00:00
Result is: ('path1', '080922_0.json') 2022-06-04 05:25:48 00:00
Result is: ('path1', '080922_0.json') 2022-06-05 05:25:21 00:00
Result is: ('path1', '080922_0.json') 2022-06-06 05:27:41 00:00
Result is: ('path1', '080922_0.json') 2022-06-07 05:26:21 00:00
Result is: ('path1', '080922_0.json') 2022-06-08 05:27:39 00:00
Result is: ('path1', '080922_0.json') 2022-06-09 05:25:34 00:00
However that is where I am stuck and unsure how to read the all final files into sqlite db.
I have some code from another stackoverflow question to read the .json file, that starts out as:
content_object = s3.Object('bucket_name', 'key')
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
Thank you in advance.
CodePudding user response:
There are two main issues with your code:
First off, you're not actually showing the key for the last modified item in a shared prefix, you're showing the last item when sorted lexicographically. That might be the last uploaded object, but it won't be in all cases.
Secondly, you're unnecessarily making two requests to get the last modified timestamp. The first request to list all objects will return the last modified timestamp, so there's no need to perform a second request to get it.
Once you fix those two issues, then you just need to store the results in a database:
import boto3
import sqlite3
import json
target_bucket = "example-bucket"
s3 = boto3.resource('s3')
bucket = s3.Bucket(target_bucket)
last_in_prefix = {}
for obj in bucket.objects.all():
# Get the shared prefix for each object
prefix = "/".join(obj.key.split('/')[:-1])
# If the common prefix is a previously unknown path, or
# this object has a last modified time after whatever we
# last saw, store this object as the target object
# for this shared prefix
if prefix not in last_in_prefix or last_in_prefix[prefix].last_modified < obj.last_modified:
last_in_prefix[prefix] = obj
# At this point we have a list of last objects in each
# shared prefix so open a database
db = sqlite3.connect("example.db")
# Create a table if needed
db.execute("CREATE TABLE IF NOT EXISTS items(key TEXT PRIMARY KEY, data TEXT NOT NULL);")
db.commit()
for path, obj in last_in_prefix.items():
# Only operate on .json objects for this example
if obj.key.endswith(".json"):
if db.execute("SELECT count(*) FROM items WHERE key=?;", (obj.key,)).fetchone()[0] == 0:
# This item doesn't exist, download it and save it
print(f"Downloading {obj.key}...")
data = obj.get()['Body'].read().decode('utf-8')
db.execute("INSERT INTO items(key, data) VALUES(?, ?);", (obj.key, data))
db.commit()
else:
print(f"Already have {obj.key}")
data = db.execute("SELECT data FROM items WHERE key=?;", (obj.key,)).fetchone()[0]
data = json.loads(data)
# TODO: Do something interesting with the data now that it's loaded