I have a bash script which activate a python script:
#!/bin/bash
#SBATCH -J XXXXX
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=16
python my_python_script.py
The python script is scanning a very large file (~480,000,000 rows) and creates a dictionary that will later be written as an output file:
with open (huge_file,'r') as hugefile, open (final_file, 'w') as final:
reader= csv.reader (hugefile, delimiter="\t")
writer= csv.writer (final, delimiter="\t")
d={}
for r in reader:
v=r[0] r[1]
if v not in d.keys():
d[v]=[r[5],r[4]]
else:
d[v].append([r[5],r[4]])
for k,v in d.items():
#analyses
nl = [different variables]
writer.writerow(nl)
Due to the size of the file, I want to use 16 CPUs for the run, yet even though I defined 16 CPUs in my bash script, It only uses 1 CPU.
I read a lot about subprocess yet it does not seem to apply in this case. I would love to hear any suggestions.
CodePudding user response:
Cores won't help you here, as the dictionary manipulation is trivial and extremely fast.
You have an I/O issue here, where reading and writing the files is the bottleneck.
If you use the multiprocessing module you may run into other issues. The dictionaries which get built will be independent of each other, so you will have duplicate keys each with other data. If the ordering of the CSV data must be kept, maybe because it is timeseries data, you will have to merge and then sort the arrays in the dictionary as an additional step, unless you take this problem into account while merging the 16 dictionaries. This also means that you will be breaking up the CSV into 16 chunks and processing them individually on each core, so that you can keep track of the ordering.
Have you considered reading the huge CSV file into a SQLite database? This would at least give you more control on how the data is accessed, since 16 processes could access the data at the same time while specifying the ordering.
I really doubt that there is anything to parallelize here. Even if you use the multiprocessing module, you need to write the entire file while taking the entire dictionary into account, which restricts your way to parallelize this task.
CodePudding user response:
Multiprocessing is difficult to apply because of everything needs to be sorted into a central dict d. Several processes will consistently have to know which keys are already in the dict, and that makes it really complex. So, the easier solution is to try speeding up processing while staying within one process. dict and list comprehension seems to be a good way forward:
# prepare dict keys and empty list entries:
d = {r[0] r[1]: [] for r in reader}
# fill dict
[d[r[0] r[1]].append([r[5], r[4]]) for r in reader]
# d is ready for analysis
CodePudding user response:
Here is an idea how to use multiple processes (not yet tested with a large file, but confident it will work when debugged).
Step 1 is to split the huge file into segments using the Linux split
function:
bash> split -l 10000000 hugefile segment
This will create files each having 10,000,000 lines and the names will be segmentaa, segmentab, ....
(see man
page of split
)
Now the Python program to read these file segments, launch one process per file segment and then consolidate the results into one dict:
import multiprocessing as mp
import csv
# define process function working on a file segment
def proc_target(q, filename):
with open(filename, 'r') as file_segment:
reader = csv.reader(file_segment, delimiter="\t")
dd = dict()
def func(r):
key = r[0] r[1]
if key in dd:
dd[key].append([r[5], r[4]])
else:
dd[key] = [r[5], r[4]]
[func(r) for r in reader]
# send result via queue to main process
q.put(dd)
if __name__ == '__main__':
segment_names = ['segmentaa', 'segmentab', 'segmentac']: # maybe there are more file segments ...
processes = dict() # all objects needed are stored in this dict
mp.set_start_method('spawn')
# launch processes
for fn in segment_names :
processes[fn] = dict()
q = mp.Queue()
p = mp.Process(target=proc_target, args=(q, fn))
p.start()
processes[fn]["process"] = p
processes[fn]["queue"] = q
# read results
for fn in segment_names:
processes[fn]["result"] = processes[fn]["queue"].get()
processes[fn]["process"].join()
# consolidate all results
# start with first segment result and merge the others into it
d = processes[segment_names[0]]["result"]
# helper function for fast execution using list comprehension
def consolidate(key, value):
if key in d:
d[key].append(value)
else:
d[key] = value
# merge other results into d
for fn in segment_names[1:]:
[consolidate(key, value) for key, value in processes[fn]["result"].items()]
# d is ready
In order to avoid I/O bottlenecks, it might be wise to distribute the segments over several disks and let the parallel processes access different I/O resources in parallel.