I know, I should use .join(). I am already using, but here is the thing: I make a round of threads (about 100) to perform some action, and after they complete, I start another 100 threads.
The context is that I am trying to check if x ports on my pc are open using threads. I start 100 threads and each check 100 different values and write their response into a txt file. The problem is that some of the ports are not being written to the file, while others are. When I run the code below, wanting to scan the ports from 3000 to 4000, I wanted my file to have 1000 lines, each specifying if the port is open or closed, but when I run it, it has, like, 930. Sometimes more, sometimes less, but never 1000 lines. Check below this code for another thing I tried.
def check_range_ports(ip_host, initial_port, final_port):
threads = []
count = initial_port
loop = 0
number_of_threads = 100
while count < final_port:
if count number_of_threads > final_port:
number_of_threads = final_port - count 1
for i in range(count, count number_of_threads):
t = threading.Thread(target=check_port, args=(ip_host, i))
t.daemon = True
threads.append(t)
for i in range(number_of_threads):
threads[i].start()
for i in range(number_of_threads):
threads[i].join()
count = number_of_threads
loop = 1
threads = []
def check_port(ip_host, port):
try:
time.sleep(0.5)
my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
my_socket.settimeout(5)
result = my_socket.connect_ex((ip_host, port))
with open("./ports.txt", "a ", encoding="utf-8") as f:
if result == 0:
f.write(f"Port {port} is open.\n")
else:
f.write(f"Port {port} is closed.\n")
my_socket.close()
except socket.timeout:
print("Timeout on socket!")
sys.exit()
except socket.gaierror:
print("Error on the host!")
sys.exit()
except KeyboardInterrupt:
print("Exiting program!")
sys.exit()
Here is another thing I tried. I created 10 threads, and each of these threads created 100 subthreads more, and each of these subthreads would write a line in a file. It works better than the previous, but I can't get 1000 lines exactly, which is what I am aiming.
What I'm thinking of doing is doable? If yes, how can I achieve it?
def start_threads(ip_host, initial_port, final_port):
threads = []
initial_port = 3000
final_port = 4000
number_of_ports_to_be_scanned = final_port - initial_port
ip_host = 'XXX.XXX.X.XX'
number_of_threads = 0
if number_of_ports_to_be_scanned / 100 != 0:
number_of_threads = int(number_of_ports_to_be_scanned / 100) 1
else:
number_of_threads = number_of_ports_to_be_scanned / 100
count = 0
for i in range(number_of_threads):
# if initial_port count > final_port:
# number_of_threads = final_port - number_of_ports_to_be_scanned 1
t = threading.Thread(
target=check_testing_port,
args=(ip_host, initial_port count, final_port)
)
# t.daemon = True
t.start()
threads.append(t)
count = 100
# for i in range(number_of_threads):
# threads[i].start()
for i in range(number_of_threads):
threads[i].join()
def check_testing_port(ip_host, port, final_port):
sub_threads = []
number_of_sub_threads = 100
print(port)
if port 100 > final_port:
number_of_sub_threads = port - final_port
for i in range(port, port number_of_sub_threads):
t = threading.Thread(target=check_port, args=(ip_host, i))
# t.daemon = True
t.start()
sub_threads.append(t)
# for i in range(number_of_sub_threads):
# sub_threads[i].start()
for i in range(number_of_sub_threads):
sub_threads[i].join()
def check_port(ip_host, port):
with open("./testing_ports.txt", "a", encoding="utf-8") as f:
f.write(f"Port {port}" "\n")
CodePudding user response:
In check_port
you wrote
with open("ports.txt", "a ") as f:
f.write(...)
That is insane. In the sense that, it is a critical section and you're not holding a lock.
Acquire a mutex before messing with the file.
Or write thread-specific files, which subsequently are combined into a single file.
Better yet, tell all threads to write to a single Queue, and have just one thread read enqueued results and append them to a text file.
CodePudding user response:
You need to properly synchronise the access to the shared buffer you are writing to (the output file) concurrently. Only one thread at a time must write to the output file, otherwise you'll get a data race leading to the data corruption you observed.
You can ensure that only one thread is writing to the shared file by using a mutex, a queue or any other suitable concurrency primitive. Here is an example using a queue:
import threading
import time
from queue import Queue
# Sentinel object to signal end of writing
__END = object()
def check_range_ports(ip_host: str):
threads: list[threading.Thread] = []
number_of_threads = 100
queue = Queue()
# Start the compute threads
for i in range(number_of_threads):
t = threading.Thread(target=check_port, args=(ip_host, i, queue))
t.start()
threads.append(t)
# Start the writer thread
tw = threading.Thread(target=writer, args=("output.txt", queue))
tw.start()
# Wait for all compute threads to finish
for i in range(number_of_threads):
threads[i].join()
# Signal to the writer loop to end
queue.put(__END)
# Wait for the writer thread to finish
tw.join()
def check_port(ip_host: str, port: int, queue: Queue):
time.sleep(0.5)
# Enqueue the result to teh synchronisation queue
queue.put((ip_host, port))
def writer(filename: str, queue: Queue):
with open(filename, "w") as f:
while True:
item = queue.get()
# End the write loop if there is no more item
# to process
if item is __END:
break
# This write operation is sequential and thread-safe
ip_host, port = item
f.write(f"Port {port} of host {ip_host} is open.\n")
def main():
check_range_ports("127.0.0.1")
if __name__ == "__main__":
main()