Home > Net >  Python: implementing multi-thread flag option in a script
Python: implementing multi-thread flag option in a script

Time:09-28

I'm writing a simple script that takes as input two TSV files (file_a.tsv and file_b.tsv) and parses all the values of file_a.tsv in order to check if they're included within a range specified in file_b.tsv. This is the script:

import os
import sys
import argparse
import pandas as pd

# Defining the main function:
def myfunc() -> tuple:
    ap = argparse.ArgumentParser()
    ap.add_argument("-a", "--file_a", help="path to file_a")
    ap.add_argument("-b", "--file_b", help="path to file_b")
    return ap.parse_args()

args = myfunc()

file_a = args.file_a
file_b = args.file_b

# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
    file_a_df = pd.read_table(a, header=None)

with open(file_b, 'r') as b:
    file_b_df = pd.read_table(b, header=None)
    file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]

# Here's two list to be used 
contained = []
not_contained = []

# Defining a function for the file_a parsing
def fileparser(val):
    for index, row in file_b_df.iterrows():
        if val >= row['start'] and val <= row['end']:
            contained.append(val)
            return True
    not_contained.append(val)

# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)

print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)

Running it from terminal looks like this:

python my_script.py -a "path/to/file_a" -b "path/to/file_b"

Here comes the issue: file_a has almost 7 million values and file_b has thousands of ranges to be checked so it's quite a big process to be done with the main thread.

I'd like to add a -p flag to my pipe like this one below in order to implement the multi-thread option and speed up the process:

python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"

I know the threading library can be imported, how could I add it to my script? Thank you.

CodePudding user response:

If you have millions of lines, you’re better off starting by optimizing the complexity (big-o) of your algorithm before parallelizing. Here you’re comparing all a items with all b ranges, and what’s more with the slow method .apply() and manually iterating on rows − let’s also replace all that with pandas/numpy primitives.

The free bonus is that you don’t need to handle parallelism yourself anymore, numpy will do it under the hood for you. See for example this guide under the « Use parallel primitives » part.

I’ll suppose for optimising purposes that you have N values in file_a and M distinct intervals.

1. We can start by coalescing all your b intervals into the minimum number of distinct intervals

>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index  = 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0      1
36     0
38     1
40     0
41     1
86     0
87     1
103    0
dtype: int64
>>> min_intervals = pd.DataFrame({
...     'start': idx[depth.eq(0).shift(fill_value=True)],
...     'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
   start  end
0      0   35
1     38   39
2     41   85
3     87  102

The main subtlety here is the 1 and -1 to the end bound, because the bound is inclusive and we want to compute the union of contiguous intervals correctly, i.e. [3, 5] and [6, 8] should be [3, 8]

Of course if you know your intervals are already separate, then you can skip all that and do min_intervals = file_b_df[['start', 'end']].sort_values('start').

2. Compare values with the sorted bounds, O(N×M) -> O(N×log(M))

Note that now min_intervals is sorted and that if we stack it, we get the bounds in sorted order. Now let’s add back the 1 and we can use pd.Series.searchsorted() to find out where in this sequence of bounds a number should be inserted to preserve the order:

>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']]  = 1
>>> bounds
0  start      0
   end       36
1  start     38
   end       40
2  start     41
   end       86
3  start     87
   end      103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])

As you can see numbers that are in the intervals (0, 1 and 35 in [0, 35], 100 in [87, 103]) return an uneven number, and numbers that are not in an interval (36, 86) return an even number.

>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

This works well if you don’t want to modify file_a of course if you’re willing to sort file_a you can get even faster results (supposing N > M).

3. Compare sorted values with interval bounds, O(N×log(M)) -> O(log(N)×M)

>>> file_a.sort_values('val', inplace=True)

From there we can now use searchsorted on file_a, e.g. to compute the rows of values that are contained:

>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0  start    0
   end      3
1  start    4
   end      4
2  start    4
   end      4
3  start    5
   end      6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
   val
0    0
1    1
2   35
4  100

Or we can construct a boolean indexer in the same way we did interval intersections:

>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index  # Index was row numbers, use file_a’s index
>>> contained
0     True
1     True
2     True
3    False
5    False
4     True
dtype: bool
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

Now the other upside of this strategy is that no looping is ever done in code we have written. So we don’t need to care about parallelizing other than allowing numpy to use it. If you really want to add parallel optimisation yourself, this other question will be of interest.

  • Related