Home > Software design >  How can create a parallel processing to make multiple processes downloading and writing?
How can create a parallel processing to make multiple processes downloading and writing?

Time:10-10

I wrote a multiprocess downloading program to download the big iso file,the idea is to cut iso file into 4 parts with range argument when request ,and open 4 processes to download.

import  requests
import  multiprocessing

class my_download(object): 
    def __init__(self,url):
        self.url = url
        self.process_num = multiprocessing.cpu_count()
        self.fn = url.split('/')[-1]
        url_headers = requests.head(self.url)
        self.total = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()

    def get_file_range(self):
        ranges = []
        download_num = int(self.total/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i 1)))
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        fn = '/tmp/'   self.fn   "-"   str(i)
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        r = requests.get(self.url, headers=headers,stream=True)
        with open(fn,'wb') as fh:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    fh.write(chunk)
        print('process {} end'.format(str(i)))

    def run(self):
        pool = multiprocessing.Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()


url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()

The whole iso file downloaded as 4 parts,i have to concatenate them a single file.

It is low efficient to merge all 4 downloaded parts into same file with below code:

    flist = ['/tmp/'   self.fn   "-"   str(i) for i in range(4)]           
    with open("/tmp/"   self.fn ,'wb') as newf:
        for filename in flist:
            with open(filename,'rb') as hf:
                newf.write(hf.read())

How can write into a same file multiprocessingly?
I can prepare a blank file whose size is equal to the resource in __init__.

    self.fh = open(self.fn,'wb')
    self.fh.seek(self.size-1)
    self.fh.write(b'\0')

An difficult job remains.The whole size is 3947823104 bytes,the program cut it 4 ranges:

ranges
[(0, 986955776), (986955776, 1973911552), (1973911552, 2960867328), (2960867328, '')]

All content belong to its range should be written into the blank file at specified position multipleprocessingly.
I try to put the file pointer with seek(ranges[i][0]) for the ith process to write content downloaded from the resource,paste my unsuccessful attempt :

import  requests,os
import  multiprocessing    

class my_download(object): 
    def __init__(self,url):
        self.url = url
        self.process_num = multiprocessing.cpu_count()
        self.fn = url.split('/')[-1]
        url_headers = requests.head(self.url)
        self.size = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()
        self.fh = open(self.fn,'wb')
        self.fh.seek(self.size-1)
        self.fh.write(b'\0')
        self.fh.flush()

    def get_file_range(self):
        ranges = []
        download_num = int(self.size/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i 1)))
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        fn = '/tmp/'   self.fn   "-"   str(i)
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        r = requests.get(self.url, headers=headers,stream=True)
        self.fh.seek(self.ranges[i][0])
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                self.fh.write(chunk)
        self.fh.flush()
        print('process {} end'.format(str(i)))

    def run(self):
        pool = multiprocessing.Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()
        self.fh.close()


url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()

How can create a perfect parallel processing to make multiple processes downloading and writing ?

CodePudding user response:

You can probably achieve some improvement by concurrent downloading segments of the file and for this multithreading should be sufficient. The number of threads to use is something that will be needed to experiment with to see not only how it effects the network performance but also the concurrent writing of the disk output. For the latter the suggestion of @showkey to use a memory-mapped file is a good one but to use multiprocessing with a memory-mapped file you need to be on a Linux-like platform, which I am guessing you might be running on (you are supposed to tag your question with the platform whenever you tag a question with multiprocessing just so I don't have to guess). But since on Windows the memory map cannot be shared across processes and you are forced to use threading, that is what I will use. And since a thread will be retrieving its segment in chunks, it may very well be releasing the GIL periodically allowing the other threads to run. Of course, if you are running under Linux, it is a question of modifying the import statements (it should be self-explanatory), although again it is not clear whether the most efficient size of that pool would be the number of cores you have.

By using a memory mapped file, there is no need for any final merging of files. In method __init__, you should set self.fn to the final filename you want.

import requests, os
# This uses multithreading:
#from multiprocessing.dummy import Pool
# This uses multiprocessing:
from multiprocessing.pool import Pool
import sys
import mmap
import time

#NUM_PROCESSES = os.cpu_count() # You must experiment with this
NUM_PROCESSES = 16

def init_pool(the_mm):
    global mm
    mm = the_mm

class my_download(object):
    def __init__(self,url):
        self.url = url
        self.process_num = NUM_PROCESSES
        self.fn = url.split('/')[-1] # Or whatever final filename you want
        url_headers = requests.head(self.url)
        self.size = int(url_headers.headers['Content-Length'])
        self.ranges = self.get_file_range()

    def get_file_range(self):
        ranges = []
        download_num = int(self.size/self.process_num)
        for i in range(self.process_num):
            if i == self.process_num-1:
                ranges.append((download_num*i,''))
            else:
                ranges.append((download_num*i,download_num*(i 1)))
        print('ranges:', ranges)
        return ranges

    def run_task(self,i):
        print('process {} start'.format(str(i)))
        headers={'Range': 'Bytes=%s-%s' % self.ranges[i],'Accept-Encoding':'*'}
        print(headers, flush=True)
        r = requests.get(self.url, headers=headers,stream=True)
        offset = self.ranges[i][0]
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                size = len(chunk)
                mm[offset:offset size] = chunk
                offset  = size
        print('process {} end'.format(str(i)))

    def run(self):
        with open(self.fn, 'wb') as f:
            if sys.platform != 'win32':
                # If not Windows then memory-mapped file size cannot be greater than disk file size:
                f.seek(self.size - 1)
            f.write(b'\0') # But for Windows a 1-byte file will suffice
        # Re-open:
        with open(self.fn, 'rb ') as f:
            # Create memory-mapped file on top of disk file:
            with mmap.mmap(f.fileno(), self.size) as mm:
                pool = Pool(processes = self.process_num, initializer=init_pool, initargs=(mm,))
                for i in range(self.process_num):
                    pool.apply_async(self.run_task,args = (i,))
                pool.close()
                pool.join()

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
downloader = my_download(url)
downloader.run()
print(time.time() - start)

The following program that uses a memory-mapped file but downloads the file in a single request took 157 seconds compared with the above program that took 85 seconds on my desktop:

import mmap
import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
url_headers = requests.head(url)
size = int(url_headers.headers['Content-Length'])
with open(fn, 'wb') as f:
    if sys.platform != 'win32':
        # If not Windows then memory-mapped file size cannot be greater than disk file size:
        f.seek(size - 1)
    f.write(b'\0') # But for Windows a 1-byte file will suffice
# Reopen:
with open(fn, 'rb ') as f:
    # memory-map the file, size 0 means whole file
    with mmap.mmap(f.fileno(), length=size) as mm:
        r = requests.get(url, stream=True)
        offset = 0
        for chunk in r.iter_content(chunk_size=1024):
            if chunk:
                size = len(chunk)
                mm[offset:offset size] = chunk
                offset  = size
print(time.time() - start)

The following program downloads the file as a single request but does not use a memory-mapped file for writing the output and took 239 seconds.

import requests
import sys
import time

start = time.time()
url = "https://chuangtzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-dvd/debian-11.0.0-amd64-DVD-1.iso"
fn = url.split('/')[-1]
with open(fn, 'wb') as f:
    r = requests.get(url, stream=True)
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:
            f.write(chunk)
print(time.time() - start)

Summary:

Download file in 16 segments using memory-mapped file: 85 seconds
Download file in 1 segment using memory-mapped file: 157 seconds
Download file in 1 segment without using memory-mapped file: 239 seconds.

In addition to trying different values for NUM_PROCESSES, you might want to try to increase the size of the chunk_size argument used with the iter_content method, although it may have no effect.

  • Related