I'm trying to write a python script which uses the subprocess module to copy files from one s3 bucket to another. However to improve performance I'm trying to run separate sync commands with different prefixes in parallel.
What I've tried so far the script doesn't terminate and I'm not sure the sub processes are running concurrently.
import subprocess
prefix = ['prefix1','prefix2','prefix3']
source_bucket = 's3://source'
dest_bucket = 's3://dest'
commands = []
for p in prefix:
command = 'aws s3 sync source_bucket' p ' dest_bucket'
commands.append(command)
procs = [subprocess.Popen(i, shell=True, stdout=subprocess.PIPE) for i in commands]
for p in procs:
p.wait()
Is there a better way of doing this? Any help is appreciated.
CodePudding user response:
Because you're passing in subprocess.PIPE
, the different processes will block while waiting for output. You'll need to run a separate process to communicate with each aws instance. One possibility is to use Python's multiprocessing:
import subprocess
import multiprocessing
def worker(command, queue):
# Don't use shell here, we can avoid the overhead
proc = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True)
# Read from the aws command output and send it off the queue as soon
# as it's available
for line in proc.stdout:
queue.put(line.rstrip("\r\n"))
# Notify the listener that we're done
queue.put(None)
def main():
# Move all work to a function so it's multiprocessing safe, see
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# Note: Adding trailing slash so "source_bucket" "prefix" is a valid S3 URI
prefixes = ['prefix1/', 'prefix2/', 'prefix3/']
source_bucket = 's3://source/'
dest_bucket = 's3://dest/'
# Create a queue to gather final messages from each worker
queue = multiprocessing.Queue()
procs = []
for p in prefixes:
# Pass in --no-progress so the progress messages aren't shown
# displaying those messages is complicated, and requires quite a bit
# of work to make sure they don't interfer with each other
# Correct the command syntax here to use all the variables
# Need to pass in the prefix to the dest URI as well so the same structure is
# maintained
# Use a argv style call here so we can avoid bringing the shell into this
command = ['aws', 's3', 'sync', '--no-progress', source_bucket p, dest_bucket p]
# Hand off the work to a worker to read from the pipe to prevent each
# spawned aws instance from blocking
proc = multiprocessing.Process(target=worker, args=(command, queue))
proc.start()
procs.append(proc)
# Read from the Queue to show the output
left = len(procs)
while left > 0:
msg = queue.get()
if msg is None:
# This means a worker is done
left -= 1
else:
# Just print out the output, doing it in one process to prevent any
# collision possibilities
print(msg)
# Clean up
for proc in procs:
proc.join()
if __name__ == "__main__":
main()