Home > OS >  Python Multithreading: Check active domains with multithreading in python optimally
Python Multithreading: Check active domains with multithreading in python optimally

Time:10-25

The idea of the program is to check for domains/subdomains living (via http/https protocol) in the subdomains.txt file.

I did this by using HEAD requests to domains/subdomains and receiving the response status code. If the status code is available, the domain or subdomain is live. (load_url_http function)

To speed up the program, I used concurrent.futures.ThreadPoolExecutor with a number of threads of 200 However, even after increasing the number of threads to 300, the program's speed isn't much improved.

I want an improvement in my program to be able to send thousands of requests at once. Below is part of my source code:

python-request-multil.py

import time

import requests
import concurrent.futures


def load_url_http(protocol: str, domain: str, timeout: int = 10):
        try:
            conn = requests.head(protocol   "://"   domain, timeout=timeout)
            return conn.status_code
        except Exception:
            return None


#--- main ---#
start_time = time.time()

worker = 400
protocol = "http"
timeout = 10

print("Number of worker:", worker)

with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
    # The file object that the subdomain lives on will be written to
    file_live_subdomain = open("live_subdomains.txt", "a")
    
    # load domain/subdomain list from file
    URLS = open("subdomains.txt", "r").read().split("\n")
    URLS_length = len(URLS)
    
    # Count the number of live subdomains
    live_count = 0
    
    # Start the load operations and mark each future with its URL
    future_to_url = {
        executor.submit(load_url_http, protocol, url, timeout): url for url in URLS
    }
    
    for i, future in zip(range(URLS_length), concurrent.futures.as_completed(future_to_url)):
        url = future_to_url[future]
        print(f"\r-->  Checking live subdomain.........{i 1}/{URLS_length}", end="")
        try:
            data = future.result()
            
            # If `load_url_http` returns any status code
            if data != None:
                # print(f'{protocol}://{url}:{data}')
                live_count = live_count   1
                file_live_subdomain.write(f"\n{protocol}://"   url)
        except Exception as exc:
            print(exc)
    print(f"\n[ ] Live domain: {live_count}/{URLS_length}", end="")
    file_live_subdomain.close()

print("\n--- %s seconds ---" % (time.time() - start_time))

Run:

┌──(quangtb㉿QuangTB)-[/mnt/e/DATA/Downloads]
└─$ python3 python-request-multil.py
Number of worker: 100
-->  Checking live subdomain.........1117/1117
[ ] Live domain: 344/1117
--- 67.41670227050781 seconds ---

┌──(quangtb㉿QuangTB)-[/mnt/e/DATA/Downloads]
└─$ python3 python-request-multil.py
Number of worker: 200
-->  Checking live subdomain.........1117/1117
[ ] Live domain: 344/1117
--- 54.6825795173645 seconds ---

┌──(quangtb㉿QuangTB)-[/mnt/e/DATA/Downloads]
└─$ python3 python-request-multil.py
Number of worker: 300
-->  Checking live subdomain.........1117/1117
[ ] Live domain: 339/1117
--- 54.186068058013916 seconds ---

┌──(quangtb㉿QuangTB)-[/mnt/e/DATA/Downloads]
└─$ python3 python-request-multil.py
Number of worker: 400
-->  Checking live subdomain.........1117/1117
[ ] Live domain: 344/1117
--- 54.19181728363037 seconds ---

CodePudding user response:

In python multithreading doesn't actually run in parallel, all the threads run under 1 process and that process runs only on 1 core (of the cpu).

You can create as many threads as you want and is won't solve the problem and can actually make it worse, because those 300 threads are running on 1 cpu core but that 1 cpu core can only run 1 command at a time, so what ends up happening is that the cpu core needs to run a few commands on 1 thread and then switch to another thread and run a few commands on that other thread and then switch to another one and etc... the switching action between threads takes resources and in that time your program code doesn't run. So in the end if you open to many threads your cpu core will spend more time on switching between threads than on executing your program.

What you can do to actually run your code simultaneously is to open a few processes instead of threads using the multiprocessing library, and then your code will run on a number of cores, and the same thing here, don't open hundreds of processes, open only a few, I recommend on openings 1 process for each core your cpu has, the multiprocessing library has a built-in function the returns the number of cores your cpu has:

import multiprocessing
print(multiprocessing.cpu_count())

note that because of the fact that the processes actually run your code simultaneously your prints can interfere with each other so you will need to use multiprocessing.Lock() and do something like this:

import multiprocessing

lock = multiprocessing.Lock()

lock.acquire()
print("something")
lock.release()

do lock.acquire() before each print and lock.release() after (if you won't release the lock your program will be stuck) and this will make sure that your prints won't get in each other.


EDIT:

in your case because of the timeout it will actually be better to open a few processes and in each process something like 20 threads.

because if the process is waiting 10 seconds for each failed address it will end up being slower than opening a lot of threads

so for your case the fastest way I can think of is opening a few processes and in each process open 20 to 30 threads

You can try something like this:

import multiprocessing
import random
import threading
import time
import requests


MAX_NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MAX_NUMBER_OF_THREADS_IN_EACH_PROCESS = 30
PROTOCOL = "http"
TIMEOUT = 4
START_TIME = time.time()


def load_url_http(protocol: str, domains: list[str], timeout: int = 10):
    with open("live_subdomains.txt", "a") as live_domains_file:
        for domain in domains:
            try:
                conn = requests.head(protocol   "://"   domain, timeout=timeout)
                if conn.status_code is not None:
                    live_domains_file.write(f"{protocol}://{domain}\n")
            except Exception:
                pass
    return


def create_threads_for_process(protocol: str, domains: list[str], timeout: int = 10):
    # create threads
    threads_list = []
    threads_urls = {}
    start = 0
    number_of_threads_to_open = len(domains) if len(domains) < MAX_NUMBER_OF_THREADS_IN_EACH_PROCESS \
        else MAX_NUMBER_OF_THREADS_IN_EACH_PROCESS
    for i in range(1, number_of_threads_to_open   1):
        # distribute the work of this process evenly between all the threads
        if i != number_of_threads_to_open:
            threads_urls[i] = domains[start:(len(domains) // number_of_threads_to_open) * i]
            start = (len(domains) // number_of_threads_to_open) * i
        else:
            threads_urls[i] = domains[start:]
        # create and start the thread
        thread = threading.Thread(target=load_url_http,
                                  args=(protocol, threads_urls[i], timeout,),
                                  daemon=True)
        thread.start()
        threads_list.append(thread)
    # wait for all threads to finish
    while threads_list:
        for thread in threads_list:
            if not thread.is_alive():
                threads_list.remove(thread)
        time.sleep(0.8)


def main():
    with open("live_subdomains.txt", "w") as file:
        file.write("")
    with open("subdomains.txt", "r") as file:
        urls = file.read().split("\n")
    random.shuffle(urls)  # shuffle the urls list
    # create the processes
    processes_list = []
    processes_urls = {}
    start = 0
    number_of_processes_to_open = len(urls) if len(urls) < MAX_NUMBER_OF_PROCESSES else MAX_NUMBER_OF_PROCESSES
    for i in range(1, number_of_processes_to_open   1):
        if i != number_of_processes_to_open:
            # give each process an even amount of work
            processes_urls[i] = urls[start:(len(urls) // number_of_processes_to_open) * i]
            start = (len(urls) // number_of_processes_to_open) * i
        else:
            # the last process will get a bit more / a bit less in
            # case len(urls) isn't dividable by number_of_processes_to_open
            processes_urls[i] = urls[start:]
        # create the process, start it and add to processes list
        process = multiprocessing.Process(target=create_threads_for_process,
                                          args=(PROTOCOL, processes_urls[i], TIMEOUT,),
                                          daemon=True)
        process.start()
        processes_list.append(process)
    # wait for all processes to finish
    while multiprocessing.active_children():
        time.sleep(0.8)
    # print result
    with open("live_subdomains.txt", "r") as live_urls_file:
        live_count = len(live_urls_file.read().split("\n")) - 1  # -1 empty line at the end
    print(f"\n[ ] Live domain: {live_count}/{len(urls)}", end="")
    print("\n--- %s seconds ---" % (time.time() - START_TIME))


if __name__ == '__main__':
    main()

in this code I opened 1 process for each core the cpu has, and in each process I opened 30 threads, also you don't need to wait 10 seconds for a replay something like 5 seconds is enough especially when you use HEAD and not GET.

I run your code and compared to mine, your code took 35 seconds to finish, mine took 25 seconds and that is on 1117 urls, the bigger the url list the more significant it will be.

  • Related