I'm trying to use multiprocessing to speed up some data labeling that I'm doing, but have noticed that it takes much longer (I actually never saw the program terminate). The original script takes about 7 hours to run, but this morning I came to work and noticed it was still running after I had let it run yesterday evening.
Task Overview
Input:
1) Pandas DataFrame with a column of text
2) Dictionary that looks like {word: label}.
(Desired) Output:
Same DataFrame but this time with the positions of the words marked in the text.
Example:
DataFrame:
----------------------------------------------------
| text
0 | I live in the United States.
----------------------------------------------------
Dict: {'United States': 'country'}
Output DataFrame:
----------------------------------------------------
| text | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------
To explain the outcome a bit, the substring 'United States'
is at position 14-26 within the text. I'm basically iterating over the DataFrame, further iterating over the dictionary, and then marking the positions using regular expressions.
What I Did
<Original Code>
def label_data(df, dict):
pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
for idx, row in enumerate(pbar): text = row['text'] spans = []
for word, label in label_dict.items():
for match in re.finditer(word, text):
start = match.start()
end = match.end()
spans.append([start, end])
row['labels'] = spans
df.iloc[idx] = row
return df
<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np
def label_data_parallel(df, dict):
num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)
df_chunks = np.array_split(df, num_cores)
labeled_dfs = pool.starmap(label_data, \
product(df_chunks, [dict] * num_cores))
df = pd.concat(labeled_dfs, axis=0)
pool.close()
pool.join()
return df
Is there anything wrong with my code? Also, the DataFrame has around 200,000 rows and the dictionary has about 3,000 key-value pairs.
CodePudding user response:
Did you think about another algorithm?
Three ideas:
- not iterating over a dataframe but rather search through combined text. This search has been optimized and studied for dozens of years so should be quite fast and hopefully well implemented in python re library. However, not to add labels that appeared because lines were glued together, add a giberrish separator. I used "@@@@@@". The gibberish separator (I admit not looking good) can be replaced by a simple check that match happened on line borders so skip it.
- All keys to be searched can also be glued into one regex pattern, then all work is done by re library in a more efficient way
- The regex pattern can be optimized as a trie as suggested here: Speed up millions of regex replacements in Python 3
E.g like this:
import nltk
import pandas as pd
import re
import string
import random
from nltk.corpus import words
random.seed(1)
# ------- this is just to create nice test data. Otherwise no need in nltk
nltk.download('words')
all_words = words.words()
data_df = pd.DataFrame(
[
' '.join(random.choices(all_words, k=random.randint(1,20))) for _ in range(200000)
], columns = ["text"])
label_keys = {
random.choice(all_words) for _ in range(3000)
}
# -------- code starts here
class Trie():
"""Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
The corresponding Regex should match much faster than a simple Regex union."""
def __init__(self):
self.data = {}
def add(self, word):
ref = self.data
for char in word:
ref[char] = char in ref and ref[char] or {}
ref = ref[char]
ref[''] = 1
def dump(self):
return self.data
def quote(self, char):
return re.escape(char)
def _pattern(self, pData):
data = pData
if "" in data and len(data.keys()) == 1:
return None
alt = []
cc = []
q = 0
for char in sorted(data.keys()):
if isinstance(data[char], dict):
try:
recurse = self._pattern(data[char])
alt.append(self.quote(char) recurse)
except:
cc.append(self.quote(char))
else:
q = 1
cconly = not len(alt) > 0
if len(cc) > 0:
if len(cc) == 1:
alt.append(cc[0])
else:
alt.append('[' ''.join(cc) ']')
if len(alt) == 1:
result = alt[0]
else:
result = "(?:" "|".join(alt) ")"
if q:
if cconly:
result = "?"
else:
result = "(?:%s)?" % result
return result
def pattern(self):
return self._pattern(self.dump())
trie_pattern = Trie()
for label in label_keys:
trie_pattern.add(re.escape(label))
reg_pattern = trie_pattern.pattern()
list_of_texts = list(data_df.text)
indices = list(map(len,list_of_texts))
all_text = "@@@@@@".join(data_df.text) # @@@@@@ - something of known length you don't expect in the text
all_matches = []
for match_ in re.finditer(reg_pattern, all_text):
all_matches.append(match_.span())
all_matches.sort(key=lambda x: x[0])
label_l = []
start = 0
all_matches_pointer = 0
indices_pointer = 0
label_l.append([])
while True:
if all_matches_pointer >= len(all_matches):
for _ in range(len(label_l),len(data_df)):
label_l.append( [])
break
match_start = all_matches[all_matches_pointer][0]
match_end = all_matches[all_matches_pointer][1]
if match_start >= start indices[indices_pointer]:
label_l.append([])
start = indices[indices_pointer] 6 # len("@@@@@@")
indices_pointer = 1
else:
label_l[-1] = [(match_start - start, match_end - start)]
all_matches_pointer = 1
data_df["labels"] = label_l
data_df
gives you desired result in a few seconds:
text labels
0 overempty stirring asyla butchering Sherrymoor [(5, 6), (19, 21), (42, 43)]
1 premeditator spindliness bilamellate amidosucc... [(3, 4), (8, 10), (29, 30), (33, 35), (38, 39)...
2 Radek vivicremation rusot noegenetic Shropshir... [(13, 14), (14, 16), (50, 52), (76, 78), (88, ...
3 uninstructiveness blintze plunging rowiness fi... [(58, 59), (87, 88), (109, 110), (122, 124), (...
4 memorialize scruffman [(0, 1), (2, 3), (18, 19)]
... ... ...
199995 emulsor treatiser [(1, 2), (11, 13)]
199996 squibling anisandrous incorrespondent vague jo... [(13, 15), (40, 43), (52, 53), (71, 73), (130,...
199997 proallotment bulletheaded uningenuousness plat... [(0, 5), (8, 9), (44, 46), (62, 65), (75, 77)]
199998 unantiquatedness sulphohalite oversoftness und... [(6, 10), (32, 35), (65, 67), (68, 71), (118, ...
199999 lenticulothalamic aerometric plastidium panell... [(14, 15), (22, 23), (31, 33), (38, 39), (46, ...
200000 rows × 2 columns
So I tried specifically with your params (see code). Dataframe of 200k rows and 3000 labels. The algorithm runs just 3-5 seconds on my m1
Problems not tackled yet that depend on your input really:
- What if labels overlap inside one line? Then would need to add a loop so that each iteration searches each label seperately (and it can be multiprocessed)
CodePudding user response:
There are several efficiencies that can be made:
- Instead of doing
re.finditer
on each country, you could create an optimized single regular expression that searches for any of the countries inlabel_dict
and pass that regex to your worker function. Given that you are searching for 3,000 countries, this should greatly improve the speed. - You only need to pass an array of strings instead an array of dataframes to the worker function along with the aforementioned compiled regular expression.
- You are leaving the main process a processor to use by creating a pool size of
mp.cpu_count() - 1
. But then you are callingstarmap
, which blocks until all the results have been returned, at which point the pool processes are idle. Instead you could use methodimap
, which can start processing the results as soon as a worker function returns something. But the amount of processing being done by the main process may not warrant dedicating a processor to it. In the code below I am using all the processors available for constructing the pool. But you could try leaving one left over for the main process to see if that is more performant. - The worker function only has to return a list of the spans it found. The main process will add a new column to the original dataframe using this data.
def label_data(texts, regex):
return [
[[match.start(), match.end()] for match in regex.finditer(text)]
for text in texts
]
def label_data_parallel(df, label_dict):
import multiprocessing as mp
import numpy as np
import re
from functools import partial
class Trie():
"""Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
The corresponding Regex should match much faster than a simple Regex union."""
def __init__(self):
self.data = {}
def add(self, word):
ref = self.data
for char in word:
ref[char] = char in ref and ref[char] or {}
ref = ref[char]
ref[''] = 1
def dump(self):
return self.data
def quote(self, char):
return re.escape(char)
def _pattern(self, pData):
data = pData
if "" in data and len(data.keys()) == 1:
return None
alt = []
cc = []
q = 0
for char in sorted(data.keys()):
if isinstance(data[char], dict):
try:
recurse = self._pattern(data[char])
alt.append(self.quote(char) recurse)
except:
cc.append(self.quote(char))
else:
q = 1
cconly = not len(alt) > 0
if len(cc) > 0:
if len(cc) == 1:
alt.append(cc[0])
else:
alt.append('[' ''.join(cc) ']')
if len(alt) == 1:
result = alt[0]
else:
result = "(?:" "|".join(alt) ")"
if q:
if cconly:
result = "?"
else:
result = "(?:%s)?" % result
return result
def pattern(self):
return self._pattern(self.dump())
num_cores = mp.cpu_count()
text_chunks = np.array_split(df['text'].values.tolist(), num_cores)
trie = Trie()
for country in label_dict.keys():
trie.add(country)
regex = re.compile(trie.pattern())
pool = mp.Pool(num_cores)
label_spans = []
for spans in pool.imap(partial(label_data, regex=regex), text_chunks):
label_spans.extend(spans)
pool.close()
pool.join()
df['labels'] = label_spans
return df
def main():
import pandas as pd
df = pd.DataFrame({'text': [
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
]})
label_dict = {
'United States': 'country',
'France': 'country',
}
label_data_parallel(df, label_dict)
print(df)
if __name__ == '__main__':
main()
Prints:
text labels
0 I live in the United States. [[14, 27]]
1 I live in the United States. [[14, 27]]
2 I live in the United States. [[14, 27]]
3 I live in the United States. [[14, 27]]
4 I live in the United States. [[14, 27]]
5 I live in the United States. [[14, 27]]
6 I live in the United States. [[14, 27]]
7 I live in the United States. [[14, 27]]
8 France is where I live But I used to live in t... [[0, 6], [49, 62]]
9 France is where I live But I used to live in t... [[0, 6], [49, 62]]
10 France is where I live But I used to live in t... [[0, 6], [49, 62]]
11 France is where I live But I used to live in t... [[0, 6], [49, 62]]
12 France is where I live But I used to live in t... [[0, 6], [49, 62]]
13 France is where I live But I used to live in t... [[0, 6], [49, 62]]
14 France is where I live But I used to live in t... [[0, 6], [49, 62]]
15 France is where I live But I used to live in t... [[0, 6], [49, 62]]