Home > Blockchain >  How can I make 100 threads write numbers increasingly in a single file?
How can I make 100 threads write numbers increasingly in a single file?

Time:12-07

I know, I should use .join(). I am already using, but here is the thing: I make a round of threads (about 100) to perform some action, and after they complete, I start another 100 threads.

The context is that I am trying to check if x ports on my pc are open using threads. I start 100 threads and each check 100 different values and write their response into a txt file. The problem is that some of the ports are not being written to the file, while others are. When I run the code below, wanting to scan the ports from 3000 to 4000, I wanted my file to have 1000 lines, each specifying if the port is open or closed, but when I run it, it has, like, 930. Sometimes more, sometimes less, but never 1000 lines. Check below this code for another thing I tried.

def check_range_ports(ip_host, initial_port, final_port):
    threads = []
    count = initial_port
    loop = 0
    number_of_threads = 100

    while count < final_port:

        if count   number_of_threads > final_port:
            number_of_threads = final_port - count   1

        for i in range(count, count   number_of_threads):
            t = threading.Thread(target=check_port, args=(ip_host, i))
            t.daemon = True
            threads.append(t)

        for i in range(number_of_threads):
            threads[i].start()

        for i in range(number_of_threads):
            threads[i].join()

        count  = number_of_threads
        loop  = 1
        threads = []

def check_port(ip_host, port):
    try:
        time.sleep(0.5)
        my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        my_socket.settimeout(5)
        result = my_socket.connect_ex((ip_host, port))
        with open("./ports.txt", "a ", encoding="utf-8") as f:
            if result == 0:
                f.write(f"Port {port} is open.\n")
            else:
                f.write(f"Port {port} is closed.\n")
        my_socket.close()

    except socket.timeout:
        print("Timeout on socket!")
        sys.exit()

    except socket.gaierror:
        print("Error on the host!")
        sys.exit()

    except KeyboardInterrupt:
        print("Exiting program!")
        sys.exit()

Here is another thing I tried. I created 10 threads, and each of these threads created 100 subthreads more, and each of these subthreads would write a line in a file. It works better than the previous, but I can't get 1000 lines exactly, which is what I am aiming.

What I'm thinking of doing is doable? If yes, how can I achieve it?

def start_threads(ip_host, initial_port, final_port):

    threads = []
    initial_port = 3000
    final_port = 4000
    number_of_ports_to_be_scanned = final_port - initial_port
    ip_host = 'XXX.XXX.X.XX'
    number_of_threads = 0

    if number_of_ports_to_be_scanned / 100 != 0:
        number_of_threads = int(number_of_ports_to_be_scanned / 100)   1
    else:
        number_of_threads = number_of_ports_to_be_scanned / 100

    count = 0

    for i in range(number_of_threads):
        # if initial_port   count > final_port:
        #     number_of_threads = final_port - number_of_ports_to_be_scanned   1

        t = threading.Thread(
            target=check_testing_port,
            args=(ip_host, initial_port   count, final_port)
        )
        # t.daemon = True
        t.start()
        threads.append(t)
        count  = 100

    # for i in range(number_of_threads):
    #     threads[i].start()

    for i in range(number_of_threads):
        threads[i].join()

def check_testing_port(ip_host, port, final_port):

    sub_threads = []

    number_of_sub_threads = 100
    print(port)
    if port   100 > final_port:
        number_of_sub_threads = port - final_port

    for i in range(port, port   number_of_sub_threads):
        t = threading.Thread(target=check_port, args=(ip_host, i))
        # t.daemon = True
        t.start()
        sub_threads.append(t)

    # for i in range(number_of_sub_threads):
    #     sub_threads[i].start()

    for i in range(number_of_sub_threads):
        sub_threads[i].join()

def check_port(ip_host, port):
    with open("./testing_ports.txt", "a", encoding="utf-8") as f:
        f.write(f"Port {port}"   "\n")

CodePudding user response:

In check_port you wrote

        with open("ports.txt", "a ") as f:
                f.write(...)

That is insane. In the sense that, it is a critical section and you're not holding a lock.

Acquire a mutex before messing with the file.

Or write thread-specific files, which subsequently are combined into a single file.

Better yet, tell all threads to write to a single Queue, and have just one thread read enqueued results and append them to a text file.

CodePudding user response:

You need to properly synchronise the access to the shared buffer you are writing to (the output file) concurrently. Only one thread at a time must write to the output file, otherwise you'll get a data race leading to the data corruption you observed.

You can ensure that only one thread is writing to the shared file by using a mutex, a queue or any other suitable concurrency primitive. Here is an example using a queue:

import threading
import time
from queue import Queue


# Sentinel object to signal end of writing
__END = object()


def check_range_ports(ip_host: str):
    threads: list[threading.Thread] = []
    number_of_threads = 100
    queue = Queue()

    # Start the compute threads
    for i in range(number_of_threads):
        t = threading.Thread(target=check_port, args=(ip_host, i, queue))
        t.start()
        threads.append(t)

    # Start the writer thread
    tw = threading.Thread(target=writer, args=("output.txt", queue))
    tw.start()
    
    # Wait for all compute threads to finish
    for i in range(number_of_threads):
        threads[i].join()
    
    # Signal to the writer loop to end
    queue.put(__END)
    
    # Wait for the writer thread to finish
    tw.join()


def check_port(ip_host: str, port: int, queue: Queue):
    time.sleep(0.5)
    
    # Enqueue the result to teh synchronisation queue
    queue.put((ip_host, port))


def writer(filename: str, queue: Queue):
    with open(filename, "w") as f:
        while True:
            item = queue.get()
            
            # End the write loop if there is no more item
            # to process
            if item is __END:
                break
            
            # This write operation is sequential and thread-safe
            ip_host, port = item
            f.write(f"Port {port} of host {ip_host} is open.\n")


def main():
    check_range_ports("127.0.0.1")
    

if __name__ == "__main__":
    main()
  • Related