Home > Software engineering >  Python multiprocessing won't stop running and seems to take much longer
Python multiprocessing won't stop running and seems to take much longer

Time:10-22

I'm trying to use multiprocessing to speed up some data labeling that I'm doing, but have noticed that it takes much longer (I actually never saw the program terminate). The original script takes about 7 hours to run, but this morning I came to work and noticed it was still running after I had let it run yesterday evening.

Task Overview

Input:
  1) Pandas DataFrame with a column of text
  2) Dictionary that looks like {word: label}.

(Desired) Output:
  Same DataFrame but this time with the positions of the words marked in the text.

Example:
DataFrame:
----------------------------------------------------
  | text
0 | I live in the United States.
----------------------------------------------------

Dict: {'United States': 'country'}

Output DataFrame:
----------------------------------------------------
  | text                         | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------

To explain the outcome a bit, the substring 'United States' is at position 14-26 within the text. I'm basically iterating over the DataFrame, further iterating over the dictionary, and then marking the positions using regular expressions.

What I Did

<Original Code>

def label_data(df, dict):
    pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
    for idx, row in enumerate(pbar): text = row['text'] spans = []    
        for word, label in label_dict.items():
            for match in re.finditer(word, text):
                start = match.start()
                end = match.end()
                spans.append([start, end])

        row['labels'] = spans
        df.iloc[idx] = row

    return df

<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np

def label_data_parallel(df, dict):
    num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)

    df_chunks = np.array_split(df, num_cores)
    labeled_dfs = pool.starmap(label_data, \
                               product(df_chunks, [dict] * num_cores))
    df = pd.concat(labeled_dfs, axis=0)

    pool.close()
    pool.join()

    return df

Is there anything wrong with my code? Also, the DataFrame has around 200,000 rows and the dictionary has about 3,000 key-value pairs.

CodePudding user response:

Did you think about another algorithm?

Three ideas:

  1. not iterating over a dataframe but rather search through combined text. This search has been optimized and studied for dozens of years so should be quite fast and hopefully well implemented in python re library. However, not to add labels that appeared because lines were glued together, add a giberrish separator. I used "@@@@@@". The gibberish separator (I admit not looking good) can be replaced by a simple check that match happened on line borders so skip it.
  2. All keys to be searched can also be glued into one regex pattern, then all work is done by re library in a more efficient way
  3. The regex pattern can be optimized as a trie as suggested here: Speed up millions of regex replacements in Python 3

E.g like this:

import nltk
import pandas as pd
import re
import string
import random
from nltk.corpus import words

random.seed(1)

# ------- this is just to create nice test data. Otherwise no need in nltk

nltk.download('words')

all_words = words.words()

data_df = pd.DataFrame(
    [
        ' '.join(random.choices(all_words, k=random.randint(1,20))) for _ in range(200000)
    ], columns = ["text"])

label_keys = {
    random.choice(all_words) for _ in range(3000)    
}

# -------- code starts here

class Trie():
    """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
    The corresponding Regex should match much faster than a simple Regex union."""

    def __init__(self):
        self.data = {}

    def add(self, word):
        ref = self.data
        for char in word:
            ref[char] = char in ref and ref[char] or {}
            ref = ref[char]
        ref[''] = 1

    def dump(self):
        return self.data

    def quote(self, char):
        return re.escape(char)

    def _pattern(self, pData):
        data = pData
        if "" in data and len(data.keys()) == 1:
            return None

        alt = []
        cc = []
        q = 0
        for char in sorted(data.keys()):
            if isinstance(data[char], dict):
                try:
                    recurse = self._pattern(data[char])
                    alt.append(self.quote(char)   recurse)
                except:
                    cc.append(self.quote(char))
            else:
                q = 1
        cconly = not len(alt) > 0

        if len(cc) > 0:
            if len(cc) == 1:
                alt.append(cc[0])
            else:
                alt.append('['   ''.join(cc)   ']')

        if len(alt) == 1:
            result = alt[0]
        else:
            result = "(?:"   "|".join(alt)   ")"

        if q:
            if cconly:
                result  = "?"
            else:
                result = "(?:%s)?" % result
        return result

    def pattern(self):
        return self._pattern(self.dump())

trie_pattern = Trie()

for label in label_keys:
    trie_pattern.add(re.escape(label))

reg_pattern = trie_pattern.pattern()

list_of_texts = list(data_df.text)

indices = list(map(len,list_of_texts))

all_text = "@@@@@@".join(data_df.text) # @@@@@@ - something of known length you don't expect in the text

all_matches = []
for match_ in re.finditer(reg_pattern, all_text):
    all_matches.append(match_.span())
all_matches.sort(key=lambda x: x[0])

label_l = []
start = 0
all_matches_pointer = 0
indices_pointer = 0
label_l.append([]) 
while True:    
    if all_matches_pointer >= len(all_matches):
        for _ in range(len(label_l),len(data_df)):
            label_l.append( [])
        break
    match_start = all_matches[all_matches_pointer][0]
    match_end = all_matches[all_matches_pointer][1]
    if match_start >= start   indices[indices_pointer]:
        label_l.append([]) 
        start  = indices[indices_pointer]   6 # len("@@@@@@")
        indices_pointer  = 1
    else:
        label_l[-1]  = [(match_start - start, match_end - start)]
        all_matches_pointer  = 1
            
    
data_df["labels"] = label_l
data_df

gives you desired result in a few seconds:

    text    labels
0   overempty stirring asyla butchering Sherrymoor  [(5, 6), (19, 21), (42, 43)]
1   premeditator spindliness bilamellate amidosucc...   [(3, 4), (8, 10), (29, 30), (33, 35), (38, 39)...
2   Radek vivicremation rusot noegenetic Shropshir...   [(13, 14), (14, 16), (50, 52), (76, 78), (88, ...
3   uninstructiveness blintze plunging rowiness fi...   [(58, 59), (87, 88), (109, 110), (122, 124), (...
4   memorialize scruffman   [(0, 1), (2, 3), (18, 19)]
... ... ...
199995  emulsor treatiser   [(1, 2), (11, 13)]
199996  squibling anisandrous incorrespondent vague jo...   [(13, 15), (40, 43), (52, 53), (71, 73), (130,...
199997  proallotment bulletheaded uningenuousness plat...   [(0, 5), (8, 9), (44, 46), (62, 65), (75, 77)]
199998  unantiquatedness sulphohalite oversoftness und...   [(6, 10), (32, 35), (65, 67), (68, 71), (118, ...
199999  lenticulothalamic aerometric plastidium panell...   [(14, 15), (22, 23), (31, 33), (38, 39), (46, ...
200000 rows × 2 columns

So I tried specifically with your params (see code). Dataframe of 200k rows and 3000 labels. The algorithm runs just 3-5 seconds on my m1

Problems not tackled yet that depend on your input really:

  1. What if labels overlap inside one line? Then would need to add a loop so that each iteration searches each label seperately (and it can be multiprocessed)

CodePudding user response:

There are several efficiencies that can be made:

  1. Instead of doing re.finditer on each country, you could create an optimized single regular expression that searches for any of the countries in label_dict and pass that regex to your worker function. Given that you are searching for 3,000 countries, this should greatly improve the speed.
  2. You only need to pass an array of strings instead an array of dataframes to the worker function along with the aforementioned compiled regular expression.
  3. You are leaving the main process a processor to use by creating a pool size of mp.cpu_count() - 1. But then you are calling starmap, which blocks until all the results have been returned, at which point the pool processes are idle. Instead you could use method imap, which can start processing the results as soon as a worker function returns something. But the amount of processing being done by the main process may not warrant dedicating a processor to it. In the code below I am using all the processors available for constructing the pool. But you could try leaving one left over for the main process to see if that is more performant.
  4. The worker function only has to return a list of the spans it found. The main process will add a new column to the original dataframe using this data.
def label_data(texts, regex):
    return [
        [[match.start(), match.end()] for match in regex.finditer(text)]
        for text in texts
        ]

def label_data_parallel(df, label_dict):
    import multiprocessing as mp
    import numpy as np
    import re
    from functools import partial

    class Trie():
        """Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
        The corresponding Regex should match much faster than a simple Regex union."""

        def __init__(self):
            self.data = {}

        def add(self, word):
            ref = self.data
            for char in word:
                ref[char] = char in ref and ref[char] or {}
                ref = ref[char]
            ref[''] = 1

        def dump(self):
            return self.data

        def quote(self, char):
            return re.escape(char)

        def _pattern(self, pData):
            data = pData
            if "" in data and len(data.keys()) == 1:
                return None

            alt = []
            cc = []
            q = 0
            for char in sorted(data.keys()):
                if isinstance(data[char], dict):
                    try:
                        recurse = self._pattern(data[char])
                        alt.append(self.quote(char)   recurse)
                    except:
                        cc.append(self.quote(char))
                else:
                    q = 1
            cconly = not len(alt) > 0

            if len(cc) > 0:
                if len(cc) == 1:
                    alt.append(cc[0])
                else:
                    alt.append('['   ''.join(cc)   ']')

            if len(alt) == 1:
                result = alt[0]
            else:
                result = "(?:"   "|".join(alt)   ")"

            if q:
                if cconly:
                    result  = "?"
                else:
                    result = "(?:%s)?" % result
            return result

        def pattern(self):
            return self._pattern(self.dump())


    num_cores = mp.cpu_count()

    text_chunks = np.array_split(df['text'].values.tolist(), num_cores)

    trie = Trie()
    for country in label_dict.keys():
        trie.add(country)
    regex = re.compile(trie.pattern())

    pool = mp.Pool(num_cores)

    label_spans = []
    for spans in pool.imap(partial(label_data, regex=regex), text_chunks):
        label_spans.extend(spans)
    pool.close()
    pool.join()

    df['labels'] = label_spans


    return df

def main():
    import pandas as pd

    df = pd.DataFrame({'text': [
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'I live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
        'France is where I live But I used to live in the United States.',
    ]})

    label_dict = {
        'United States': 'country',
        'France': 'country',
    }

    label_data_parallel(df, label_dict)
    print(df)

if __name__ == '__main__':
    main()

Prints:

                                                 text              labels
0                        I live in the United States.          [[14, 27]]
1                        I live in the United States.          [[14, 27]]
2                        I live in the United States.          [[14, 27]]
3                        I live in the United States.          [[14, 27]]
4                        I live in the United States.          [[14, 27]]
5                        I live in the United States.          [[14, 27]]
6                        I live in the United States.          [[14, 27]]
7                        I live in the United States.          [[14, 27]]
8   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
9   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
10  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
11  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
12  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
13  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
14  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
15  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
  • Related