Home > Software engineering >  Python multiprocessing a class
Python multiprocessing a class

Time:12-22

I am trying to multiprocess selenium where each process is spawned with a selenium driver and a session (each process is connected with a different account).

I have a list of URLs to visit. Each URL needs to be visited once by one of the account (no matter which one).

To avoid some nasty global variable management, I tried to initialize each process with a class object using the initializer of multiprocessing.pool.

After that, I can't figure out how to distribute tasks to the process knowing that the function used by each process has to be in the class.

Here is a simplified version of what I'm trying to do :

from selenium import webdriver
import multiprocessing

account =  [{'account':1},{'account':2}]

class Collector():

    def __init__(self, account):

        self.account = account
        self.driver = webdriver.Chrome()

    def parse(self, item):

        self.driver.get(f"https://books.toscrape.com{item}")

if __name__ == '__main__':
    
    processes = 1
    pool = multiprocessing.Pool(processes,initializer=Collector,initargs=[account.pop()])

    items = ['/catalogue/a-light-in-the-attic_1000/index.html','/catalogue/tipping-the-velvet_999/index.html']
    
    pool.map(parse(), items, chunksize = 1)

    pool.close()
    pool.join() 

The problem comes on the the pool.map line, there is no reference to the instantiated object inside the subprocess. Another approach would be to distribute URLs and parse during the init but this would be very nasty.

Is there a way to achieve this ?

CodePudding user response:

I'm not entirely certain if this solves your problem.

If you have one account per URL then you could do this:

from selenium import webdriver
from multiprocessing import Pool

items = ['/catalogue/a-light-in-the-attic_1000/index.html',
         '/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'

def process(i, a):
    print(f'Processing account {a}')
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')

    with webdriver.Chrome(options=options) as driver:
        driver.get(f'{baseurl}{i}')


def main():
    with Pool() as pool:
        pool.starmap(process, zip(items, accounts))


if __name__ == '__main__':
    main()

If the number of accounts doesn't match the number of URLs, you have said that it doesn't matter which account GETs from which URL. So, in that case, you could just select the account to use at random (random.choice())

CodePudding user response:

Since Chrome starts its own process, there is really no need to be using multiprocessing when multithreading will suffice. I would like to offer a more general solution to handle the case where you have N URLs you want to retrieve where N might be very large but you would like to limit the number of concurrent Selenium sessions you have to MAX_DRIVERS where MAX_DRIVERS is a significantly smaller number. Therefore, you only want to create one driver session for each thread in the pool and reuse it as necessary. Then the problem becomes calling quit on the driver when you are finished with the pool so that you don't leave any Selenium processes behind running.

The following code uses threadlocal storage, which is unique to each thread, to store the current driver instance for each pool thread and uses a class destructor to call the driver's quit method when the class instance is destroyed:

from selenium import webdriver
from multiprocessing.pool import ThreadPool
import threading

items = ['/catalogue/a-light-in-the-attic_1000/index.html',
         '/catalogue/tipping-the-velvet_999/index.html']
accounts = [{'account': 1}, {'account': 2}]
baseurl = 'https://books.toscrape.com'

threadLocal = threading.local()

class Driver:
    def __init__(self):
        options = webdriver.ChromeOptions()
        options.add_argument("--headless")
        options.add_experimental_option('excludeSwitches', ['enable-logging'])
        self.driver = webdriver.Chrome(options=options)

    def __del__(self):
        self.driver.quit() # clean up driver when we are cleaned up
        print('The driver has been "quitted".')

    @classmethod
    def create_driver(cls):
        the_driver = getattr(threadLocal, 'the_driver', None)
        if the_driver is None:
            the_driver = cls()
            threadLocal.the_driver = the_driver
        return the_driver.driver


def process(i, a):
    print(f'Processing account {a}')
    driver = Driver.create_driver()
    driver.get(f'{baseurl}{i}')


def main():
    global threadLocal

    # We never want to create more than
    MAX_DRIVERS = 8 # Rather arbitrary
    POOL_SIZE = min(len(accounts), MAX_DRIVERS)
    with ThreadPool(POOL_SIZE) as pool:
        pool.starmap(process, zip(items, accounts))
    # ensure the drivers are "quitted":
    del threadLocal
    import gc
    gc.collect() # a little extra insurance

if __name__ == '__main__':
    main()
  • Related