Home > Net >  Multi thread or multi process to process large data by doing chunk
Multi thread or multi process to process large data by doing chunk

Time:01-14

I have a large data of 20K records or 50K records - this will be continuous process.

Am trying to chunk what ever i get data for ex: 20K records i am dividing into smaller chunks of 5K record chuck. so i get 4 chunks of data.

All 4 chunk data i need to pass into a function call and it should process parallelly because i may get one more batch of records which i need to divide to chunks and send them for parallelism.

But, it keep on loading and not processing the records as expected and continuously looping same first batch of data not stopping for chunks and pick next data of chunks

here is it what i tried,

def divide_chunks(l, n):
    small_msgs = []
    for i in range(0, len(l), n):
        small_msgs.append(l[i:i   n])
    return small_msgs

def process_data(data, i):
    #process data for chunks
    try:
        # data processing here according to my requirement
        # it may take 20-25 seconds of process that is why am planning for parallel 
          #processing
    except exceptions.BadRequest as exc:
        print(json.dumps({'error': str(exc)}))
return True

#msgs are nothing but bulk data recieving from server continuously am appending to msgs
chunk_msgs = divide_chunks(msgs, 5000)

#clearing msgs to append next data after chunking previous data
msgs.clear()

for n in range(0, len(chunk_msgs)):
    threading.Thread(target=process_data, args=(chunk_msgs[n],n)).start()

CodePudding user response:

from your vague problem description i can only conclude that the problem is that work is not getting done in parallel.

threads cannot work completely in parallel due to the GIL (global interpreter lock), you should be using a process Pool instead, the process pool can do work concurrently, but it depends on what the "work" is, this is a minimal example to get it to work with a process pool.

import multiprocessing

def divide_chunks(l, n):
    small_msgs = []
    for i in range(0, len(l), n):
        small_msgs.append(l[i:i   n])
    return small_msgs

def process_data(data, i):
    #process data for chunks
    try:
        print(data,i)
        # data processing here according to my requirement
        # it may take 20-25 seconds of process that is why am planning for parallel
        # processing
    except Exception as e:
        print("exception", e)
    return True

if __name__ == "__main__":  # only imports and function/class defs before this line.
    msgs = [1,2,3,4,5,6,7,8,9]
    #msgs are nothing but bulk data recieving from server continuously am appending to msgs
    chunk_msgs = divide_chunks(msgs, 3)

    #clearing msgs to append next data after chunking previous data
    msgs.clear()
    with multiprocessing.Pool(len(chunk_msgs)) as pool:
        pool.starmap(process_data, [(chunk_msgs[n],n) for n in range(len(chunk_msgs))])
[1, 2, 3] 0
[4, 5, 6] 1
[7, 8, 9] 2
  • Related