Home > Enterprise >  Python Parallelize os.walk() for performance on a local SSD
Python Parallelize os.walk() for performance on a local SSD

Time:12-10

I am trying to parallelize the following code to get it to run faster:

import os

fs = 0
for root, dirs, files in os.walk(os.getcwd()):
  for file in files:
    fp = os.path.join(root, file)
    fs  = os.path.getsize(fp)
      
print(fs)

I tried the following:

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def main():

  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []
      for file in files:
        file_path = os.path.join(root, file)
        futures.append(executor.submit(os.path.getsize, file_path))

      for future in as_completed(futures):
        total_size  = future.result()

  print(total_size)
  
if __name__ == '__main__':
  main()

but it is far slower than my original code (iterating over files on an SSD). I'm on Windows using Python 3.8. Any idea on how to properly speed it up?

CodePudding user response:

The second code is slower due to the inter-process communication. To understand why, we need to understand how things works in the first place.

During the first run, the OS truly use the SSD and put things in a RAM cache. Then, subsequent executions of this code are significantly faster because of that (no need to interact with the SSD device at all). On my machine the first run takes about 10 second serially while the second one takes about 5 seconds. Regarding the OS, the file system and the actual SSD driver, a system lock may be used so this operation may not be faster in parallel (this is done so to prevent any corruption and also to ease the development of some part of the system stack). Historically, some OS even used an horribly inefficient enter image description here

Each line is a process. The process with the PID 26172 is the master one (starting others and waiting for them). The brown part is the CPU time (activity) of the processes. The profiling is split in small piece of 1ms. Light grey and light green pieces are the pieces where there was a context-switch or a synchronization and dark green one are the piece where the process was only doing computation.

What we can see is that the worker are relatively active but they are a lot of time where they are starving or waiting for something (typically the access to IO-related resources). The main thread is slowed down by workers since the operation down not perfectly scale. In fact, the calls to os.path.getsize are about 10-15% slower, mainly because CreateFileW does not scale (certainly due to system locks). The parallel computation cannot be faster than the speed of os.walk. On some platforms with many cores and a scalable OS file subsystem, os.walk may be a bottleneck. If so, the file tree can be travelled using recursive tasks at the expense of a significantly more complex code.

In practice, on Windows, it turns out that the os.path.getsize implementation is apparently not efficient. Based on this post, using GetFileAttributesEx should be significantly faster. The bad news is that the alternative function os.stat also use the same method. Thus, if you want to do this operation efficiently, it is certainly a good idea to write a C extension for that. Such a C extension can also benefit from using threads while not having any issues with the GIL, pickling, nor the IPC.

CodePudding user response:

You should batch the files, otherwise probably the overhead of the futures is larger than the save. For 1 million small files, on Linux I got with the following code a ~2x speedup than without ProcessPoolExecutor. You might need to adjust the number of files you set in a batch for your system (10k in my example).

import os
from concurrent.futures import ProcessPoolExecutor, as_completed

def getsize_list(files):
  fs = 0
  for file in files:
    fs  = os.path.getsize(file)
  return fs

def main():
  total_size = 0
  with ProcessPoolExecutor() as executor:
    for root, dirs, files in os.walk(os.getcwd()):
      futures = []

      current_list = []
      for file in files:
        file_path = os.path.join(root, file)
        current_list.append(file_path)
        if len(current_list)==10_000:
          futures.append(executor.submit(getsize_list, current_list))
          current_list = []
      futures.append(executor.submit(getsize_list, current_list))

      for future in as_completed(futures):
        total_size  = future.result()

  print(total_size)

if __name__ == '__main__':
  main()
  • Related