Home > Back-end >  Improve code result speed by multiprocessing
Improve code result speed by multiprocessing

Time:12-14

I'm self study of Python and it's my first code. I'm working for analyze logs from the servers. Usually I need analyze full day logs. I created script (this is example, simple logic) just for check speed. If I use normal coding the duration of analyzing 20mil rows about 12-13 minutes. I need 200mil rows by 5 min. What I tried:

  1. Use multiprocessing (met issue with share memory, think that fix it). But as the result - 300K rows = 20 sec and no matter how many processes. (PS: Also need control processors count in advance)
  2. Use threading (I found that it's not give any speed, 300K rows = 2 sec. But normal code same, 300K = 2 sec)
  3. Use asyncio (I think that script is slow because need reads many files). Result same as threading - 300K = 2 sec. Finally I think that all three my script incorrect and didn't work correctly.

PS: I try to avoid use specific python modules (like pandas) because in this case it will be more difficult to execute on different servers. Better to use common lib.

Please help to check 1st - multiprocessing.

import csv
import os
from multiprocessing import Process, Queue, Value, Manager

file = {"hcs.log", "hcs1.log", "hcs2.log", "hcs3.log"}

def argument(m, a, n):
     proc_num = os.getpid()
     a_temp_m = a["vod_miss"]
     a_temp_h = a["vod_hit"]
     with open(os.getcwd()   '/'   m, newline='') as hcs_1:
         hcs_2 = csv.reader(hcs_1, delimiter=' ')
         for j in hcs_2:
             if j[3].find('MISS') != -1:
                 a_temp_m[n] = a_temp_m[n]   1
             elif j[3].find('HIT') != -1:
                 a_temp_h[n] = a_temp_h[n]   1
     a["vod_miss"][n] = a_temp_m[n]
     a["vod_hit"][n] = a_temp_h[n]

if __name__ == '__main__':
    procs = []
    manager = Manager()
    vod_live_cuts = manager.dict()
    i = "vod_hit"
    ii = "vod_miss"
    cpu = 1
    n = 1
    vod_live_cuts[i] = manager.list([0] * cpu)
    vod_live_cuts[ii] = manager.list([0] * cpu)
    for m in file:
        proc = Process(target=argument, args=(m, vod_live_cuts, (n-1)))
        procs.append(proc)
        proc.start()
        if n >= cpu:
            n = 1
            proc.join()
        else:
            n  = 1
    [proc.join() for proc in procs]
    [proc.close() for proc in procs]

I'm expect, each file by def argument will be processed by independent process and finally all results will be saved in dict vod_live_cuts. For each process I added independent list in dict. I think it will help cross operation for use this parameter. But maybe it's wrong way :(

CodePudding user response:

using IPC is costly, so only use "shared objects" for saving the final result, not for intermediate results while parsing the file.

limiting the number of processes is done by using a multiprocessing.Pool, the following code uses it to reach the max hard-disk speed, you only need to post-process the results.

you can only parse data as fast as your HDD can read it (typically 30-80 MB/s), so if you need to improve the performance further you should use SSD or RAID0 for higher disk speed, you cannot get much faster than this without changing your hardware.

import csv
import os
from multiprocessing import Process, Queue, Value, Manager, Pool

file = {"hcs.log", "hcs1.log", "hcs2.log", "hcs3.log"}

def argument(m, a):
     proc_num = os.getpid()
     a_temp_m_n = 0  # make it local to process
     a_temp_h_n = 0  # as shared lists use IPC
     with open(os.getcwd()   '/'   m, newline='') as hcs_1:
         hcs_2 = csv.reader(hcs_1, delimiter=' ')
         for j in hcs_2:
             if j[3].find('MISS') != -1:
                 a_temp_m_n = a_temp_m_n   1
             elif j[3].find('HIT') != -1:
                 a_temp_h_n = a_temp_h_n   1
     a["vod_miss"].append(a_temp_m_n)
     a["vod_hit"].append(a_temp_h_n)

if __name__ == '__main__':
    manager = Manager()
    vod_live_cuts = manager.dict()
    i = "vod_hit"
    ii = "vod_miss"
    cpu = 1
    vod_live_cuts[i] = manager.list()
    vod_live_cuts[ii] = manager.list()
    with Pool(cpu) as pool:
        tasks = []
        for m in file:
            task = pool.apply_async(argument, args=(m, vod_live_cuts))
            tasks.append(task)
        for task in tasks:
            task.get()
    print(list(vod_live_cuts[i]))
    print(list(vod_live_cuts[ii]))
  • Related