Home > Net >  Buffered items and bulk insert to Mysql using scrapy
Buffered items and bulk insert to Mysql using scrapy

Time:09-13

I'm scraping large amount items and pipelines insert to databese one by one. It's take long time.

So, every time the pipeline receives an item, it inserts into the database. And it is not a smart way. I'm looking for a way to buffer the pipeline items and, for example, when we receive 1000 items, execute a bulk insert. How can I achieve that?

My current pipeline:

def __init__(self, **kwargs):
    self.cnx = self.mysql_connect()

def open_spider(self, spider):
    print("spider open")

def process_item(self, item, spider):
    print("Saving item into db ...")
    self.save(dict(item))
    return item

def close_spider(self, spider):
    self.mysql_close()

def mysql_connect(self):
    try:
        return mysql.connector.connect(**self.conf)
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
        else:
            print(err)


def save(self, row): 
    cursor = self.cnx.cursor()
    
    cursor.execute("SELECT DISTINCT product_id FROM products;")
    existing_ids = [row[0] for row in cursor.fetchall()]
    create_query = ("INSERT INTO "   self.table   
        "(rowid, date, listing_id, product_id, product_name, price, url) "
        "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")

    # Insert new row
    cursor.execute(create_query, row)
    lastRecordId = cursor.lastrowid

    # Make sure data is committed to the database
    self.cnx.commit()
    cursor.close()
    print("Item saved with ID: {}" . format(lastRecordId))

    product_id = row['product_id']
    if not product_id in existing_ids:
        create_query = ("INSERT INTO "   self.table2   
            "(product_rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        new_cursor = self.cnx.cursor()
        new_cursor.execute(create_query, row)
        lastRecordId = cursor.lastrowid

        self.cnx.commit()
        new_cursor.close()
        print("New Item saved with ID: {}" . format(lastRecordId))
    

def mysql_close(self):
    self.cnx.close()

CodePudding user response:

You can just add a cache in the pipeline constructor and execute a method that caches items as they are processed and then runs the bulk save once some threshold has been reached in the number of rows cached. Then as the spider exits it can bulk save whatever is left in the cache that hasn't been saved yet.

I created an example below to demonstrate this strategy, however I didn't make any alterations to your sql code, so the code still saves the items one by one to the database, just all at the same time. I am sure there is room for further performance improvements with some changes to your sql code as well.

class Pipeline:
    def __init__(self, **kwargs):
        self._rows = []   #  store rows temporarily
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            list(map(self.save, self._rows))
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache

    def cache_result(self, item):  # adds new row to cache
        self._rows.append(dict(item))
        self._cached_rows  = 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows

    def process_item(self, item, spider):
        print("Saving item into db ...")
        self.cache_result(item)    # cache this item
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)


    def save(self, row):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        existing_ids = [row[0] for row in cursor.fetchall()]
        create_query = ("INSERT INTO "   self.table  
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")

        # Insert new row
        cursor.execute(create_query, row)
        lastRecordId = cursor.lastrowid

        # Make sure data is committed to the database
        self.cnx.commit()
        cursor.close()
        print("Item saved with ID: {}" . format(lastRecordId))

        product_id = row['product_id']
        if not product_id in existing_ids:
            create_query = ("INSERT INTO "   self.table2  
                "(product_rowid, date, listing_id, product_id, product_name, price, url) "
                "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
            new_cursor = self.cnx.cursor()
            new_cursor.execute(create_query, row)
            lastRecordId = cursor.lastrowid
            self.cnx.commit()
            new_cursor.close()
            print("New Item saved with ID: {}" . format(lastRecordId))

  • Related