Home > Enterprise >  How to split a list into multiple batches and assign each batch to a thread worker
How to split a list into multiple batches and assign each batch to a thread worker

Time:11-02

I want to split a bigger list (2000 items) into multiple batches, each going to run on a different thread in parallel.

The text of the problem is this:

We need to calculate the sum of squares for a list containing more than 2000 items. Ex: if the list was [1,2,3,4,5] -> result will be 55

Make sure to implement and time each of the approaches: 15.1. Try using a single threaded approach for calculating the sum of squares for a list of items from 0 to 2000. 15.2. Try using the multithreading approach by splitting the list of items into batches of items that need to be calculated in parallel

This is what I came up with until now:

def divide_list(l, n):

    # looping till length l
    for i in range(0, len(l), n):
        yield l[i : i   n]


def task():
    # didn't know what to write here


if __name__ == "__main__":

    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    # Single thread

    t1_start = perf_counter()

    squares_single = [x**2 for x in range(2001)]
    print(sum(squares_single))

    t1_stop = perf_counter()

    print("Elapsed time using a single thread in seconds:", t1_stop - t1_start)

    # Multithreading

    t2_start = perf_counter()

    squares_threading = [x**2 for x in range(2001)]
    # print("Square threading: ", squares_threading)
    x = list(divide_list(squares_threading, 200))

    sum = 0
    n_threads = len(x)
    with ThreadPoolExecutor(n_threads) as executor:
        for list in x:
            for result in executor.map(task, list):
                sum  = [x**2 for x in list]
        print(sum)
    t2_stop = perf_counter()

    print("Elapsed time using multithreading in seconds:", t2_stop - t2_start)




CodePudding user response:

I'm new to the ThreadPoolExecutor. After reading https://docs.python.org/3/library/concurrent.futures.html, I pulled out the ThreadPoolExecutor Example that utilizes the submit() method (as opposed to the map). My results are below. I've removed some of your code (like perf_counter) to get my code to work in a simpler manner.

import sys
import concurrent.futures
import logging

def sum_squares( *args, **kwargs ):
  sum = 0 
  try:
    list,start,end = args
  except Exception as e:
    print(e,args)
    raise e

  for x in list[start:end]:
    sum  = x**2 

  return sum

def divide_list(l, n):

    # looping till length l
    for i in range(0, len(l), n):
        yield l[i : i   n]


def task():
    # didn't know what to write here
    pass

if __name__ == "__main__":
  format = "%(asctime)s: %(message)s"
  logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

  # Single thread
  l = list(range(2001))

  squares_single = [x**2 for x in l]
  print(sum(squares_single))
  print( sum_squares( l, 0, len(l)))  # let's make sure the sum_squares function matches your results
  # Multithreading

  sum = 0
  n_threads = 10
  futures = []
  
  with concurrent.futures.ThreadPoolExecutor(n_threads) as executor:
      sum = 0 
      for start in range(0, len(l), n_threads):
          futures.append(executor.submit( sum_squares,  l, start, start   n_threads  ) )

      for future in concurrent.futures.as_completed( futures ):
        sum  = future.result() 

      print(sum)

I've got three print's in there and the results are:

2668667000
2668667000
2668667000
  • Related