I'm writing a script for comparing many DNA genomes with each other, and I'm trying to use multiprocessing to have it run faster. All the processes are appending to a common list, genome_score_avgs
.
This is my main process:
if __name__ == "__main__":
start = time.perf_counter()
with Manager() as manager:
genome_score_avgs = manager.list()
processes = [Process(target=compareGenomes, args=(chunk, genome_score_avgs,)) for chunk in divideGenomes('TEST_DIR')]
for p in processes:
p.start()
for p in processes:
p.join()
print(genome_score_avgs)
print(*createTimeline(genome_score_avgs), sep='\n')
print(f'Finished in {time.perf_counter() - start} seconds')
This is the error that I'm getting:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py", line 801, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/ayushpal/Coding/PythonStuff/C4DInter/main.py", line 59, in <module>
print(*createTimeline(genome_score_avgs), sep='\n')
File "/Users/ayushpal/Coding/PythonStuff/C4DInter/main.py", line 42, in createTimeline
min_score = min(score_avgs, key=lambda x: x[2])
File "<string>", line 2, in __getitem__
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py", line 805, in _callmethod
self._connect()
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/managers.py", line 792, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 507, in Client
c = SocketClient(address)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 635, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
<ListProxy object, typeid 'list' at 0x7fc04ea36bb0; '__str__()' failed>
I read in a similar Stack Overflow question that the main process is ending earlier than the other processes, which destroys the shared list, and that I should use p.join()
for all the processes. This is what I'm doing , however, it's still giving the same error. What should I do?
EDIT 1:
this is the code for CompareGenomes()
:
def compareGenomes(genome_pairings, genome_score_avgs):
scores = []
for genome1, genome2 in genome_pairings:
print(genome1, genome2)
for i, seq in enumerate(genome1.protein_seqs):
for j, seq2 in enumerate(genome2.protein_seqs[i::]):
alignment = align.globalxx(seq, seq2)
scores.append(alignment)
top_scores = []
for i in range(len(genome1.protein_seqs)):
top_scores.append(max(scores, key=lambda x: x[0][2] / len(x[0][1])))
scores.remove(max(scores, key=lambda x: x[0][2] / len(x[0][1])))
avg_score = sum([i[0][2] / len(i[0][1]) for i in top_scores]) / len(top_scores)
with open(f'alignments/{genome1.name}x{genome2.name}.txt', 'a') as file:
file.writelines([format_alignment(*i[0]) for i in top_scores])
genome_score_avgs.append((genome1, genome2, avg_score))
CodePudding user response:
The error is happening because you are using the managed list after you have closed the manager. Once that happens, the process that the manager spawns is closed as well, and therefore your managed list will no longer work. You need to use the list inside the with
block like below:
if __name__ == "__main__":
start = time.perf_counter()
with Manager() as manager:
genome_score_avgs = manager.list()
processes = [Process(target=compareGenomes, args=(chunk, genome_score_avgs,)) for chunk in divideGenomes('TEST_DIR')]
for p in processes:
p.start()
for p in processes:
p.join()
print(genome_score_avgs)
print(*createTimeline(genome_score_avgs), sep='\n')
print(f'Finished in {time.perf_counter() - start} seconds')