I currently have a script that fires a request to an API endpoint which returns a csv.gzip
file - which roughly contains 75,000 rows and 15 columns. I download this files to the webserver disk storage, unzip the file to .csv
and then loop through every row and add the data into my database. Finally deleting the files saved to disk. The process currently takes between 5 and 10 minutes to complete.
I'm sure there is areas of improvement but not sure how to implement them. Some of them is:
- Save csv data to variable rather than on disk.
- Bulk import the data into my database.
I'm sure there are other improvements to be made, so any advice would be appreciated.
response = oauth.get(realm)
content = ET.fromstring(response.content)
coded_string = (content.find('.//pricefile'))
decoded_string = base64.b64decode(coded_string.text)
with open('test.csv.gzip', 'wb') as f:
f.write(decoded_string)
with gzip.open('test.csv.gzip', 'rb') as f_in:
with open('test.csv', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
with open('test.csv') as f:
reader = csv.reader(f)
next(reader)
for row in reader:
pricing.objects.update_or_create(
product_id=row[0],
date=datetime.now(),
defaults={
'average_price': Decimal(row[1] or 0),
'low_price': Decimal(row[2] or 0),
'high_price': Decimal(row[3] or 0),
...
})
os.remove('test.csv')
os.remove('test.csv.gzip')
CodePudding user response:
You should probably use bulk_create. Something like:
pricing.objects.bulk_create(
[
pricing(
product_id=row[0],
date=datetime.now(),
average_price=Decimal(row[1] or 0),
…
),
for row in reader
],
update_conflicts=True,
update_fields=["date", "average_price", …],
unique_fields=["product_id"]
)
CodePudding user response:
An example using psycopg2
directly. names.csv
is derived from the US Social Security names database and has 260000 lines by four columns.
import psycopg2
import csv
import time
con = psycopg2.connect(dbname="test", host='localhost', user='postgres', port=5432)
cur = con.cursor()
with open("/home/aklaver/software_projects/test_data/data_sets/names/names.csv", newline="") as csv_file:
t1 = time.time()
cur.execute('create temp table names_csv_tmp(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
cur.execute('create table if not exists names_csv(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
cur.copy_expert("COPY names_csv_tmp FROM STDIN WITH CSV HEADER", csv_file)
cur.execute("insert into names_csv select * from names_csv_tmp ON CONFLICT (name,gender, year) DO UPDATE SET name = EXCLUDED.name, gender=EXCLUDED.gender")
cur.execute("drop table names_csv_tmp")
con.commit()
t2 = time.time()
print(t2 - t1)
For first run where the table names_csv
was empty and the INSERT
was just inserting all the the records from names_csv_tmp
the timing was 3.66
seconds.
For the second run where the ON CONFLICT
triggered updates the run time was 7.29
seconds.