Home > front end >  How to run AWS S3 sync command concurrently for different prefixes using Python
How to run AWS S3 sync command concurrently for different prefixes using Python

Time:09-26

I'm trying to write a python script which uses the subprocess module to copy files from one s3 bucket to another. However to improve performance I'm trying to run separate sync commands with different prefixes in parallel.

What I've tried so far the script doesn't terminate and I'm not sure the sub processes are running concurrently.

import subprocess

prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []

for p in prefix:
   command = 'aws s3 sync source_bucket'   p   ' dest_bucket'
   commands.append(command)

procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]

for p in procs:
   p.wait()

Is there a better way of doing this? Any help is appreciated.

CodePudding user response:

Because you're passing in subprocess.PIPE, the different processes will block while waiting for output. You'll need to run a separate process to communicate with each aws instance. One possibility is to use Python's multiprocessing:

import subprocess
import multiprocessing

def worker(command, queue):
    # Don't use shell here, we can avoid the overhead
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
    # Read from the aws command output and send it off the queue as soon
    # as it's available
    for line in proc.stdout:
        queue.put(line.rstrip("\r\n"))
    
    # Notify the listener that we're done
    queue.put(None)


def main():
    # Move all work to a function so it's multiprocessing safe, see
    # https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
    # Note: Adding trailing slash so "source_bucket"   "prefix" is a valid S3 URI
    prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
    source_bucket = 's3://source/'
    dest_bucket = 's3://dest/'

    # Create a queue to gather final messages from each worker
    queue = multiprocessing.Queue()
    procs = []

    for p in prefixes:
        # Pass in --no-progress so the progress messages aren't shown
        # displaying those messages is complicated, and requires quite a bit
        # of work to make sure they don't interfer with each other
        # Correct the command syntax here to use all the variables
        # Need to pass in the prefix to the dest URI as well so the same structure is
        # maintained
        # Use a argv style call here so we can avoid bringing the shell into this
        command = ['aws', 's3', 'sync', '--no-progress', source_bucket   p, dest_bucket   p]
        # Hand off the work to a worker to read from the pipe to prevent each
        # spawned aws instance from blocking
        proc = multiprocessing.Process(target=worker, args=(command, queue))
        proc.start()
        procs.append(proc)

    # Read from the Queue to show the output
    left = len(procs)
    while left > 0:
        msg = queue.get()
        if msg is None:
            # This means a worker is done
            left -= 1
        else:
            # Just print out the output, doing it in one process to prevent any
            # collision possibilities
            print(msg)
    
    # Clean up
    for proc in procs:
        proc.join()

if __name__ == "__main__":
    main()
  • Related