I have around 30000 Urls in my csv. I need to check if it has meta content is present or not, for each url. I am using request_cache to basically cache the response to a sqlite db. It was taking about 37hrs even after using a caching sys. Therefore I moved to concurrency. I think I have done something wrong with out = executor.map(download_site, sites, headers)
. And do not know how to fix it.
AttributeError: 'str' object has no attribute 'items'
import concurrent.futures
import requests
import threading
import time
import pandas as pd
import requests_cache
from PIL import Image
from io import BytesIO
thread_local = threading.local()
df = pd.read_csv("test.csv")
sites = []
for row in df['URLS']:
sites.append(row)
# print("URL is shortened")
user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers={'User-Agent':user_agent,}
requests_cache.install_cache('network_call', backend='sqlite', expire_after=2592000)
def getSess():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def networkCall(url, headers):
print("In Download site")
session = getSess()
with session.get(url, headers=headers) as response:
print(f"Read {len(response.content)} from {url}")
return response.content
out = []
def getMeta(meta_res):
print("Get data")
for each in meta_res:
meta = each.find_all('meta')
for tag in meta:
if 'name' in tag.attrs.keys() and tag.attrs['name'].strip().lower() in ['description', 'keywords']:
content = tag.attrs['content']
if content != '':
out.append("Absent")
else:
out.append("Present")
return out
def allSites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
out = executor.map(networkCall, sites, headers)
return list(out)
if __name__ == "__main__":
sites = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
] * 15000
start_time = time.time()
list_meta = allSites(sites)
print("META ", list_meta)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
output = getMeta(list_meta)
df["is it there"] = pd.Series(output)
df.to_csv('new.csv',index=False, header=True)
CodePudding user response:
I have tried to emulate your functionality. The following code executes in under 4 minutes:-
from bs4 import BeautifulSoup as BS
import concurrent.futures
import time
import queue
import requests
URLs = [
"https://www.jython.org",
"http://olympus.realpython.org/dice"
] * 15_000
user_agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7'
headers = {'User-Agent': user_agent}
class SessionCache():
def __init__(self, cachesize=20):
self.cachesize = cachesize
self.sessions = 0
self.q = queue.Queue()
def getSession(self):
try:
return self.q.get(block=False)
except queue.Empty:
pass
if self.sessions < self.cachesize:
self.q.put(requests.Session())
self.sessions = 1
return self.q.get()
def putSession(self, session):
self.q.put(session)
CACHE = SessionCache()
def doGet(url):
try:
session = CACHE.getSession()
response = session.get(url, headers=headers)
response.raise_for_status()
soup = BS(response.text, 'lxml')
for meta in soup.find_all('meta'):
if (name := meta.attrs.get('name', None)):
if name.strip().lower() in ['description', 'keywords']:
if meta.attrs.get('content', '') != '':
return url, 'Present'
return url, 'Absent'
except Exception as e:
return url, str(e)
finally:
CACHE.putSession(session)
def main():
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
for r in executor.map(doGet, URLs):
print(f'{r[0]} -> {r[1]}')
end = time.perf_counter()
print(f'Duration={end-start:.4f}s')
if __name__ == '__main__':
main()
CodePudding user response:
This error is happening in requests.models.PrepareRequest.prepare_headers()
. When you call executor.map(networkCall, sites, headers)
, it's casting headers
to a list, so you end up with request.headers = 'User-Agent'
instead of request.headers = {'User-Agent': '...'}
.
Since it looks like the headers aren't actually changing, you can make that a constant and remove it as an argument from networkCall()
:
HEADERS = {'User-Agent':user_agent}
...
def networkCall(url):
session = getSess()
with session.get(url, headers=HEADERS) as response:
print(f"Read {len(response.content)} from {url}")
return response.content
...
def allSites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
out = executor.map(networkCall, sites)
return list(out)
Another thing worth noting is that requests_cache.install_cache()
is not thread-safe, which causes the sqlite3.OperationalError
you got earlier. You can remove install_cache()
and use requests_cache.CachedSession
instead, which is thread-safe:
def getSess():
if not hasattr(thread_local, "session"):
thread_local.session = requests_cache.CachedSession(
'network_call',
backend='sqlite',
expire_after=2592000,
)
return thread_local.session
For reference, there's more info in the requests-cache user guide on the differences between sessions and patching.