Home > Blockchain >  Scrape dynamic table with pagination while preserving links
Scrape dynamic table with pagination while preserving links

Time:11-15

I am a beginner Python programmer trying to scrape a dynamic table (datatable) that has pagination. There are "first" and "previous" pagination buttons indexed "0" and "1" respectively, followed by numbered buttons (see attached pic), so I want to start with button 1 indexed as "2" and then iterate through the pages until I capture the entire table with all of the links intact.

<a href="#" aria-controls="datatable" data-dt-idx="2" tabindex="0">1</a>

I managed to scrape the info for the first ten table rows, but don't know how to advance to capture the rest of the pages. I think I need to loop through those pagination buttons somehow(?) After reading countless tutorials and stackoverflow questions and watching several Youtube videos, I managed to cobble together the following code. However, I ended up with html for the whole site, not just my table, and only retrieved the first 10 rows of the table that were on the first page.

from bs4 import BeautifulSoup
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ec
from selenium.webdriver.chrome.options import Options

chrome_options = Options()

chrome_options.add_argument('--headless')
driver = webdriver.Chrome(r"C:\Users\MyName\chromedriver", options=chrome_options)

url = "https://www.fda.gov/inspections-compliance-enforcement-and-criminal-investigations/compliance-actions-and-activities/warning-letters"
driver.get(url)

table_confirm = WebDriverWait(driver, 20).until(
  ec.presence_of_element_located((By.ID, "datatable"))
)
page_source = driver.page_source
soup = BeautifulSoup(page_source,'lxml')

print(soup)

data = []
table = soup.find('table', {'class':'lcds-datatable table table-bordered cols-8 responsive-enabled dataTable no-footer dtr-inline collapsed'})
table_body = table.find('tbody')
rows = table_body.find_all('tr')
for row in rows:
    cols = row.find_all('td')
    cols = [ele.text.strip() for ele in cols]
    data.append([ele for ele in cols if ele])

Can someone please help me out? Thanks. [1]: https://i.stack.imgur.com/RUsui.png

CodePudding user response:

If you view the page in a browser, and use your browser's developer tools to log your network traffic while navigating through the pages, you'll see that, every time you change pages, an XHR (XmlHttpRequest) HTTP GET request is being made to a REST API, the response of which is JSON and contains all the information you're trying to scrape. This JSON is then normally parsed- and used to populate the DOM asynchronously using JavaScript.

To get the data you're looking for, all you have to do is imitate that request. Selenium is overkill for this - all you need is requests. You can even tweak the request a bit to suit your needs. For example, by default, the request initiated by the page will only grab the next 10 results/entries. I've changed the request to grab 100 at a time, but there's really no advantage or disadvantage here.

def make_pretty(entry):
    import re

    pattern = ">([^<]*)<"

    return {
        "posted_date": re.search(pattern, entry[0]).group(1),
        "letter_issue_date": re.search(pattern, entry[1]).group(1),
        "company_name": re.search(pattern, entry[2]).group(1),
        "issuing_office": entry[3],
        "subject": entry[4],
        "response_letter": entry[5],
        "closeout_letter": entry[6]
    }
    

def get_entries():
    import requests
    from itertools import count

    url = "https://www.fda.gov/datatables/views/ajax"

    group_length = 100

    params = {
        "length": group_length,
        "view_display_id": "warning_letter_solr_block",
        "view_name": "warning_letter_solr_index",
    }

    headers = {
        "user-agent": "Mozilla/5.0",
        "x-requested-with": "XMLHttpRequest"
    }

    for current_group in count(0):
        start = current_group * group_length
        end = ((current_group   1) * group_length) - 1
        params["start"] = start
        
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
        if not data["data"]:
            break
        yield from map(make_pretty, data["data"])

        print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))

def main():
    global all_entries
    all_entries = list(get_entries())
    print("Total number of entries: {}".format(len(all_entries)))

    return 0


if __name__ == "__main__":
    import sys
    sys.exit(main())

Output:

yielding 0-99
yielding 100-199
yielding 200-299
yielding 300-399
yielding 400-499
yielding 500-599
yielding 600-699
yielding 700-799
yielding 800-899
yielding 900-999
yielding 1000-1099
yielding 1100-1199
yielding 1200-1299
yielding 1300-1399
yielding 1400-1499
yielding 1500-1599
yielding 1600-1699
yielding 1700-1799
yielding 1800-1899
yielding 1900-1999
yielding 2000-2099
yielding 2100-2199
yielding 2200-2299
yielding 2300-2399
yielding 2400-2499
yielding 2500-2599
yielding 2600-2658
Total number of entries: 2658
all_entries[0]
{'posted_date': '11/10/2021', 'letter_issue_date': '11/10/2021', 'company_name': 'Wyoming Vapor Company', 'issuing_office': 'Center for Tobacco Products', 'subject': 'Family Smoking Prevention and Tobacco Control Act/Adulterated/Misbranded', 'response_letter': '', 'closeout_letter': ''}

get_entries is a generator that makes requests to the REST API and yields individual entries until there are no more entries.

make_pretty is used to make the individual entries that we're yielding in get_entries "pretty". From the JSON we receive, each "entry" corresponds to a list of strings, where some strings are HTML. make_pretty just naively parses those HTML strings in each entry, and returns a dictionary with key-value pairs for each entry, which is a bit cleaner to work with.

main is the main entry-point of the script. We invoke get_entries and consume all items from the generator, letting the entries accumulate in the all_entries list. I've only added the global all_entries line so that I can play around with- and inspect all_entries in the Python shell after the script has ended - it isn't required.

Take a look at this other answer I posted to a similar question, where I go more in-depth about using your browser's developer tools, logging network traffic, finding- and imitating XHR requests and how to inspect the response.


EDIT: Here is the updated code:

keys = (
    "posted_date",         # entry[0]
    "letter_issue_date",   # entry[1]
    "company_name",        # entry[2]
    "company_url",         # entry[2]
    "issuing_office",      # entry[3]
    "subject",             # entry[4]
    "response_letter_url", # entry[5]
    "closeout_letter_url"  # entry[6]
)


def make_pretty(entry):
    from bs4 import BeautifulSoup as Soup
    import re

    pattern = "[^<]*"
    
    return dict(zip(keys, [
        Soup(entry[0], "html.parser").text.strip(),
        Soup(entry[1], "html.parser").text.strip(),
        Soup(entry[2], "html.parser").text.strip(),
        entry[2] and "https://www.fda.gov"   Soup(entry[2], "html.parser").find("a")["href"],
        entry[3].strip(),
        re.search(pattern, entry[4]).group(),
        entry[5] and "https://www.fda.gov"   Soup(entry[5], "html.parser").find("a")["href"],
        entry[6] and "https://www.fda.gov"   Soup(entry[6], "html.parser").find("a")["href"]
    ]))
    

def get_entries():
    import requests
    from itertools import count

    url = "https://www.fda.gov/datatables/views/ajax"

    group_length = 100

    params = {
        "length": group_length,
        "view_display_id": "warning_letter_solr_block",
        "view_name": "warning_letter_solr_index",
    }

    headers = {
        "user-agent": "Mozilla/5.0",
        "x-requested-with": "XMLHttpRequest"
    }

    for current_group in count(0):
        start = current_group * group_length
        end = ((current_group   1) * group_length) - 1
        params["start"] = start
        
        response = requests.get(url, params=params, headers=headers)
        response.raise_for_status()
        data = response.json()
        if not data["data"]:
            break
        yield from map(make_pretty, data["data"])


        print("yielding {}-{}".format(start, min(end, data["recordsFiltered"])))

def main():
    import csv

    with open("output.csv", "w", newline="", encoding="utf-8") as file:
        writer = csv.DictWriter(file, fieldnames=keys, quoting=csv.QUOTE_ALL)
        writer.writeheader()
        writer.writerows(get_entries())
    print("Done writing.")

    return 0


if __name__ == "__main__":
    import sys
    sys.exit(main())
  • Related