Home > OS >  Downloading meta content for around 15000 URL Python - threading
Downloading meta content for around 15000 URL Python - threading

Time:09-23

I have around 30000 Urls in my csv. I need to check if it has meta content is present or not, for each url. I am using request_cache to basically cache the response to a sqlite db. It was taking about 37hrs even after using a caching sys. Therefore I moved to concurrency. I think I have done something wrong with out = executor.map(download_site, sites, headers). And do not know how to fix it.

AttributeError: 'str' object has no attribute 'items'

import concurrent.futures
import requests
import threading
import time
import pandas as pd
import requests_cache
from PIL import Image
from io import BytesIO

thread_local = threading.local()

df = pd.read_csv("test.csv")

sites = []
for row in df['URLS']:
    sites.append(row)

# print("URL is shortened")

user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers={'User-Agent':user_agent,}

requests_cache.install_cache('network_call', backend='sqlite', expire_after=2592000)


def getSess():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def networkCall(url, headers):
    print("In Download site")
    session = getSess()
    with session.get(url, headers=headers) as response:
        print(f"Read {len(response.content)} from {url}")
        return response.content

out = []
def getMeta(meta_res):
    print("Get data")
    for each in meta_res:
        meta = each.find_all('meta')
        for tag in meta:
            if 'name' in tag.attrs.keys() and tag.attrs['name'].strip().lower() in ['description', 'keywords']:
                content = tag.attrs['content']
                if content != '':
                    out.append("Absent")
                else:
                    out.append("Present")
    return out


def allSites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        out = executor.map(networkCall, sites, headers)
        return list(out)


if __name__ == "__main__":
    sites = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice",
    ] * 15000
    start_time = time.time()
    list_meta = allSites(sites)
    print("META   ", list_meta)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")
    output = getMeta(list_meta)
    df["is it there"] = pd.Series(output)
    df.to_csv('new.csv',index=False, header=True)

CodePudding user response:

I have tried to emulate your functionality. The following code executes in under 4 minutes:-

from bs4 import BeautifulSoup as BS
import concurrent.futures
import time
import queue
import requests


URLs = [
    "https://www.jython.org",
    "http://olympus.realpython.org/dice"
] * 15_000

user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers = {'User-Agent': user_agent}


class SessionCache():
    def __init__(self, cachesize=20):
        self.cachesize = cachesize
        self.sessions = 0
        self.q = queue.Queue()

    def getSession(self):
        try:
            return self.q.get(block=False)
        except queue.Empty:
            pass
        if self.sessions < self.cachesize:
            self.q.put(requests.Session())
            self.sessions  = 1
        return self.q.get()

    def putSession(self, session):
        self.q.put(session)


CACHE = SessionCache()


def doGet(url):
    try:
        session = CACHE.getSession()
        response = session.get(url, headers=headers)
        response.raise_for_status()
        soup = BS(response.text, 'lxml')
        for meta in soup.find_all('meta'):
            if (name := meta.attrs.get('name', None)):
                if name.strip().lower() in ['description', 'keywords']:
                    if meta.attrs.get('content', '') != '':
                        return url, 'Present'
        return url, 'Absent'
    except Exception as e:
        return url, str(e)
    finally:
        CACHE.putSession(session)


def main():
    start = time.perf_counter()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for r in executor.map(doGet, URLs):
            print(f'{r[0]} -> {r[1]}')
    end = time.perf_counter()
    print(f'Duration={end-start:.4f}s')


if __name__ == '__main__':
    main()

CodePudding user response:

This error is happening in requests.models.PrepareRequest.prepare_headers(). When you call executor.map(networkCall, sites, headers), it's casting headers to a list, so you end up with request.headers = 'User-Agent' instead of request.headers = {'User-Agent': '...'}.

Since it looks like the headers aren't actually changing, you can make that a constant and remove it as an argument from networkCall():

HEADERS = {'User-Agent':user_agent}
...

def networkCall(url):
    session = getSess()
    with session.get(url, headers=HEADERS) as response:
        print(f"Read {len(response.content)} from {url}")
        return response.content
...

def allSites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
        out = executor.map(networkCall, sites)
        return list(out)

Another thing worth noting is that requests_cache.install_cache() is not thread-safe, which causes the sqlite3.OperationalError you got earlier. You can remove install_cache() and use requests_cache.CachedSession instead, which is thread-safe:

def getSess():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests_cache.CachedSession(
            'network_call',
            backend='sqlite',
            expire_after=2592000,
        )
    return thread_local.session

For reference, there's more info in the requests-cache user guide on the differences between sessions and patching.

  • Related