I have a large data of 20K records or 50K records - this will be continuous process.
Am trying to chunk what ever i get data for ex: 20K records i am dividing into smaller chunks of 5K record chuck. so i get 4 chunks of data.
All 4 chunk data i need to pass into a function call and it should process parallelly because i may get one more batch of records which i need to divide to chunks and send them for parallelism.
But, it keep on loading and not processing the records as expected and continuously looping same first batch of data not stopping for chunks and pick next data of chunks
here is it what i tried,
def divide_chunks(l, n):
small_msgs = []
for i in range(0, len(l), n):
small_msgs.append(l[i:i n])
return small_msgs
def process_data(data, i):
#process data for chunks
try:
# data processing here according to my requirement
# it may take 20-25 seconds of process that is why am planning for parallel
#processing
except exceptions.BadRequest as exc:
print(json.dumps({'error': str(exc)}))
return True
#msgs are nothing but bulk data recieving from server continuously am appending to msgs
chunk_msgs = divide_chunks(msgs, 5000)
#clearing msgs to append next data after chunking previous data
msgs.clear()
for n in range(0, len(chunk_msgs)):
threading.Thread(target=process_data, args=(chunk_msgs[n],n)).start()
CodePudding user response:
from your vague problem description i can only conclude that the problem is that work is not getting done in parallel.
threads cannot work completely in parallel due to the GIL (global interpreter lock), you should be using a process Pool instead, the process pool can do work concurrently, but it depends on what the "work" is, this is a minimal example to get it to work with a process pool.
import multiprocessing
def divide_chunks(l, n):
small_msgs = []
for i in range(0, len(l), n):
small_msgs.append(l[i:i n])
return small_msgs
def process_data(data, i):
#process data for chunks
try:
print(data,i)
# data processing here according to my requirement
# it may take 20-25 seconds of process that is why am planning for parallel
# processing
except Exception as e:
print("exception", e)
return True
if __name__ == "__main__": # only imports and function/class defs before this line.
msgs = [1,2,3,4,5,6,7,8,9]
#msgs are nothing but bulk data recieving from server continuously am appending to msgs
chunk_msgs = divide_chunks(msgs, 3)
#clearing msgs to append next data after chunking previous data
msgs.clear()
with multiprocessing.Pool(len(chunk_msgs)) as pool:
pool.starmap(process_data, [(chunk_msgs[n],n) for n in range(len(chunk_msgs))])
[1, 2, 3] 0
[4, 5, 6] 1
[7, 8, 9] 2