Home > Software design >  concurrent.futures.ThreadPoolExecutor / Multithreading runs out of memory (Killed)
concurrent.futures.ThreadPoolExecutor / Multithreading runs out of memory (Killed)

Time:09-30

i am currently working on an supposedly easy web scraping project while learning python. I have a list of about 70MB with a few million IP addresses (sys.argv[1]) that i want to process. Of course, not all of them are reachable.

I am trying to make use of the the concurrent.futures and am currently experiencing memory problems - eventually leading to the whole process being killed.

Now, i have split my futures in two sets (done and not done) as suggested here. I am using about 100 workers (sys.argv[2]) and have 1GB memory available.

I though all done futures would be released once future.results() is called with => futures 1000 done? However, it just seems to be slowing down the process (including the memory being filled until the process is killed).

What am i missing here? Any suggestions on how to handle this?

Thank you in advance.

My code is as follows:

import sys
import requests
import concurrent.futures
import urllib3
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def title(host):
    try:
        url="https://" host
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        print(host ": " title)
    except:
        pass

max=int(sys.argv[2])
with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
    futures_done = set()
    futures_notdone = set()
    with open(sys.argv[1]) as f:
        for line in f:
            host = line.strip()
            futures_notdone.add(executor.submit(title, host))
            if len(futures_notdone) >= max:
                done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
                futures_done.update(done)
            for future in futures_done:
                if len(futures_done) >= 1000:
                    future.result()

CodePudding user response:

Look like you are storing done futures in a set without clearing this list later, so it can grow very large. This could be the cause of your memory problem. The .release() method of future does not release it, and it is still referenced in the done_future list.

Not perfect, but you can try the following. It schedules at most max job to be executed concurrently. It periodically gather done jobs and reschedules new jobs. The idea comes from this blog.

The drawback I see in this method is that it must periodically poll the max scheduled jobs to find the ones that are done, this could be slow with large max values.

import sys
import requests
import concurrent.futures
import urllib3
from itertools import islice
from lxml.html import fromstring
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def title(host):
    try:
        url="https://" host
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=3, verify=False)
        tree = fromstring(r.content.decode('utf-8'))
        title = tree.findtext('.//title')
        print(host ": " title)
    except:
        pass

max = int(sys.argv[2])

with concurrent.futures.ThreadPoolExecutor(max_workers=max) as executor:
    with open(sys.argv[1]) as f:
        futures = {executor.submit(title, host) for host in islice(f, max)}
        
        while futures:
            done, futures = concurrent.futures.wait(
                futures, return_when=concurrent.futures.FIRST_COMPLETED)

            for future in done:
                print(future.result())

            for host in islice(f, len(done)):
                futures.add(executor.submit(title, host))
  • Related