Home > Enterprise >  Parallel file download corruption
Parallel file download corruption

Time:11-28

I have this code below that is able to download multiple parts of a file in parallel and writes them using a memory mapped file. The problem is occurring in the DownloadFile() function. The file is starting downloads properly, but it is getting corrupted along the way. For example, if I try to download a image, sections of it will be corrupted. I'm not sure if this is from some kind of race conduction in the code or if it has something to do with the content range calculations of the parts. Any help on how or were the issue is occurring would be appreciated, thanks!

Minimal, Reproducible Example:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.IO.MemoryMappedFiles;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;

namespace ZenTester
{
internal class FileChunk
{
    public long Start { get; set; }
    public long End { get; set; }
    public FileChunk(){}
    public int Id { get; set; }

    public FileChunk(long startByte, long endByte)
    {
        Start = startByte;
        End = endByte;
    }
}

internal class RetryHandler : DelegatingHandler
{
    private int _maxRetries = 3;

    public RetryHandler(HttpMessageHandler innerHandler) : base(innerHandler) { }

    public RetryHandler(HttpMessageHandler innerHandler, int maxRetries) : base(innerHandler)
    {
        _maxRetries = maxRetries;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        HttpResponseMessage response = null;
        for (var i = 0; i < _maxRetries; i  )
        {
            response = await base.SendAsync(request, cancellationToken);
            if (response.IsSuccessStatusCode)
            {
                return response;
            }
        }

        return response;
    }
}

public static class ZenTester
{
    private static async Task DownloadFile(string url, int parts, string outFile = null!)
    {
        var responseLength = (await WebRequest.Create(url).GetResponseAsync()).ContentLength;
        var partSize = (long)Math.Floor(responseLength / (parts   0.0));
        var pieces = new List<FileChunk>();
        var uri = new Uri(url);
        
        WriteLine(responseLength.ToString(CultureInfo.InvariantCulture)   " TOTAL SIZE");
        WriteLine(partSize.ToString(CultureInfo.InvariantCulture)   " PART SIZE"   "\n");
        
        string filename = outFile ?? Path.GetFileName(uri.LocalPath);

        var mmf = MemoryMappedFile.CreateFromFile(filename, FileMode.OpenOrCreate, null, responseLength);

        var httpPool = new HttpClient(new RetryHandler(new HttpClientHandler(), 10)) {MaxResponseContentBufferSize = 1000000000};
        
        //Loop to add all the events to the queue
        for (long i = 0; i < responseLength; i  = partSize)
        {
            pieces.Add(i   partSize < responseLength
                ? new FileChunk(i, i   partSize)
                : new FileChunk(i, responseLength));
        }

        await Parallel.ForEachAsync(pieces, parallelOptions, async (piece, cancellationToken) =>
        {
            var client = httpPool.Get();
            var request = new HttpRequestMessage { RequestUri = new Uri(url) };
            request.Headers.Range = new RangeHeaderValue(piece.Start, piece.End);
                
            var message = client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).Result;

            if (message.IsSuccessStatusCode)
            {
                await using var streamToRead = await message.Content.ReadAsStreamAsync();
                var streams = mmf.CreateViewStream(piece.Start, piece.End-piece.Start - 1);
                var T = streamToRead.CopyToAsync(streams);
                T.Wait();
                if (T.IsCompletedSuccessfully)
                {
                    streams.Flush();
                    streams.Close();
                }
            }
        });
        
    }
    
    public static void Main(string[] args)
    {
        var url = "https://wallpaperaccess.com/full/2159447.jpg";

        var s = DownloadFile(url, 8);
        
        s.Wait();
    }
}
}

CodePudding user response:

I figured out the corruption was happening becauase some of the ranges would be bigger than Start-End. I fixed the issue but adding a read-write modifier to both the Memory mapped file and the view stream. Then I changed the size of the view stream to the length of the content returned by the sendAsync request:

var streams = mmf.CreateViewStream(piece.Start, message.Content.Headers.ContentLength!.Value,
                        MemoryMappedFileAccess.ReadWrite);
  • Related