In my previous post, I asked how I can record items in bulk using scrapy. The topic is here:
Buffered items and bulk insert to Mysql using scrapy
With the help of @Alexander, I can keep 1000 items in cache. However, my problem here is that the items in the cache are recording one by one while they are being transferred to mysql. My only issue here is speed. I think this problem is caused by SQL codes that I can't optimize enough.
The logic to save in SQL is as follows;
Add the items to the products
table, if the product_id
doesn't exist add it to the new_products
table. (I'm running a script in the background that deletes these rows from old to new. I have no problem here. In other words, a maximum of 50k rows are recorded in total.)
Probably mysql is slowing down during insertion to new_products
table. Because it checks if product_id exists in existing rows.
I would be very happy if you could suggest a method where I can save 1000 items in a database at once.
The pipeline.py I am using:
from __future__ import print_function
import logging
from scrapy import signals
from itemadapter import ItemAdapter
from mysql.connector import errorcode
from amazon_scraper.items import AmazonMobileDetailsItem
import mysql.connector
class AmazonScraperPipeline:
table = 'products'
table2 = 'new_products'
conf = {
'host': 'localhost',
'user': 'xxxxxx',
'password': 'xxxxxx',
'database': 'xxxxxxx',
'raise_on_warnings': True
}
def __init__(self, **kwargs):
self._rows = [] # store rows temporarily
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()
def open_spider(self, spider):
print("spider open")
def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
list(map(self.save, self._rows))
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache
def cache_result(self, item): # adds new row to cache
self._rows.append(dict(item))
self._cached_rows = 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows
def process_item(self, item, spider):
print("Saving item into db ...")
self.cache_result(item) # cache this item
return item
def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " self.table
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)
# Insert new row
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid
# Make sure data is committed to the database
# self.cnx.commit()
# cursor.close()
print("Item saved")
product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " self.table2
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)
# new_cursor = self.cnx.cursor()
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid
# self.cnx.commit()
# new_cursor.close()
print("New Item saved")
self.cnx.commit()
CodePudding user response:
You can eliminate the first query of the save method by executing it upon initialization and storing a copy of it as an instance variable, and then updating it with new entries in the save method. And another performance booster would probably come from using an executemany
feature of the mysql cursor by passing all the rows to the save method instead of one at a time.
class Pipeline:
table = 'products'
table2 = 'new_products'
conf = {
'host': 'localhost',
'user': 'xxxxxx',
'password': 'xxxxxx',
'database': 'xxxxxxx',
'raise_on_warnings': True
}
def __init__(self, **kwargs):
self._rows = [] # store rows temporarily
self._unique_products = [] # unique product rows
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()
self.existing_ids = self.get_product_ids()
def open_spider(self, spider):
print("spider open")
def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
self.save(self._rows, self._unique_products)
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache
self._unique_products = []
def process_item(self, item, spider):
row = dict(item)
product_id = row['product_id']
if product_id not in self.existing_ids:
self._unique_products.append(row)
self.existing_ids.add(product_id)
self._rows.append(row)
self._cached_rows = 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows
return item
def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def get_product_ids(self):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
return set([row[0] for row in cursor.fetchall()])
def save(self, rows, products):
cursor = self.cnx.cursor()
create_query = ("INSERT INTO " self.table
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# Insert new row
cursor.executemany(create_query, rows)
# Make sure data is committed to the database
self.cnx.commit()
cursor.close()
print("Item saved with ID: {}" . format(cursor.lastrowid))
create_query = ("INSERT INTO " self.table2
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
new_cursor = self.cnx.cursor()
new_cursor.executemany(create_query, products)
self.cnx.commit()
new_cursor.close()
print("New Item saved with ID: {}" . format(new_cursor.lastrowid))
I am actually curious how much a performance boost this will have so please share the difference in time.