Home > other >  ThreadPoolExecutor finishing before all threads are actually finished
ThreadPoolExecutor finishing before all threads are actually finished

Time:01-15

I have 28 methods that are being run in a pool. A total of 28 threads are created from ThreadPoolExecutor, which is an Executor subclass that uses a pool of threads to execute calls asynchronously. During the thread execution, I am using Plotly to generate some charts. I have problems in terms of that the ThreadPoolExecutor finishes before all threads are actually finished. I am always having hit and miss with 4 charts (threads) that are not created (not finished). This is my code:

from concurrent.futures import ThreadPoolExecutor

pool = ThreadPoolExecutor(max_workers=len(methods))

for method in methods:
    pool.submit(method, commits, system_name, reset, chart_output)

pool.shutdown(wait=True)

The executed methods are looking like:

def commits_by_date(commits, system_name, reset, chart_output):
collection_name = "commits_by_date"
reset_db_data(reset, system_name, collection_name)
date_commits = retrieve_db_data(system_name, collection_name)

if len(date_commits) == 0:
    date_commits = commits.groupby('commit_date')[['sha']].count()
    date_commits = date_commits.rename(columns={'sha': 'commit_count'})
    date_commits.insert(0, "system_name", system_name)
    date_commits = date_commits.reset_index()
    save_df_to_db(date_commits, collection_name)

if chart_output:
    fig = go.Figure([go.Scatter(
        x=date_commits.commit_date,
        y=date_commits.commit_count,
        text=date_commits.commit_count,
        fill='tozeroy')])
    fig.update_layout(
        title='Commits by Date',
        yaxis_title='Commits Count')
    fig.write_html('commits_by_date.html', auto_open=True)

CodePudding user response:

It depends what method is doing. When using concurrency, shared mutable state must be avoided. The function you are trying to execute concurrently seems to access the plotly graph, which is a shared mutable state.

To avoid problems, you should only make concurrent code that is reentrant and part of the code that mutate shared state should be executed synchronously.

One way to achieve this is to break down method in two functions: the first one do the heavy work you want to parallelize (and must be reentrant) and the second one plot the results synchronously.

Here is an example of how you could achieve this with Python concurrent.futures module:

from concurrent.futures import ThreadPoolExecutor, as_completed

def heavy_work(arg):
  # Some heavy work...
  result = slow_function(arg)
  return result

def plot(result, figure):
  # Plot the result to a shared figure,
  # must be executed synchronously.
  figure.plot(result)

args = [...]  # List of arguments to `heavy_work`
figure = ...  # The shared figure

# Submit work to be executed concurrently
with ThreadPoolExecutor() as pool:
  futures = [pool.submit(heavy_work, arg) for arg in args]

# Serialize the calls to `plot`
for future in as_completed(futures):
  result = future.result()
  plot(result, figure)

CodePudding user response:

The answer is to use:

import time

for method in methods:
    pool.submit(method, commits, system_name, reset, chart_output)
    time.sleep(as_many_you_want)
  •  Tags:  
  • Related