Home > Blockchain >  Connection problems while using Parallel.ForEach Loop
Connection problems while using Parallel.ForEach Loop

Time:04-05

I have a foreach loop which is responsible for executing a certain set of statements. A part of that is to save an image from a URL to Azure storage. I have to do this for a large set of data. To achieve the same I have converted the foreach loop into a Parallel.ForEach loop.

 Parallel.ForEach(listSkills, item =>
 {
     // some business logic
     var b = getImageFromUrl(item.Url);
     Stream ms = new MemoryStream(b);

     saveImage(ms);
     // more business logic
 });

 private static byte[] getByteArray(Stream input)
 {
   using (MemoryStream ms = new MemoryStream())
   {
     input.CopyTo(ms);
     return ms.ToArray();
   }
 }

 public static byte[] getImageFromUrl(string url)
 {
    HttpWebRequest request = null;
    HttpWebResponse response = null;
    byte[] b = null;
    request = (HttpWebRequest)WebRequest.Create(url);
    response = (HttpWebResponse)request.GetResponse();
    if (request.HaveResponse)
    {
      if (response.StatusCode == HttpStatusCode.OK)
       {
          Stream receiveStream = response.GetResponseStream();
          b = getByteArray(receiveStream);
       }
    }

    return b;
 }

 public static void saveImage(Stream fileContent)
 {
  fileContent.Seek(0, SeekOrigin.Begin);
  byte[] bytes = getByteArray(fileContent);
  var blob = null;
  blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length).Wait();
 }

Although there are instances when I am getting the below error and the image is not getting saved.

An existing connection was forcibly closed by the remote host.

Also sharing the StackTrace :

   at System.Net.Sockets.NetworkStream.Read(Span`1 buffer)
   at System.Net.Security.SslStream.<FillBufferAsync>d__183`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Net.Security.SslStream.<ReadAsyncInternal>d__181`1.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Net.Security.SslStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.Read(Span`1 buffer)
   at System.Net.Http.HttpConnection.Read(Span`1 destination)
   at System.Net.Http.HttpConnection.ContentLengthReadStream.Read(Span`1 buffer)
   at System.Net.Http.HttpBaseStream.Read(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Stream.CopyTo(Stream destination, Int32 bufferSize)
   at Utilities.getByteArray(Stream input) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 238
   at Utilities.getImageFromUrl(String url) in D:\repos\SampleProj\Sample\Helpers\CH.cs:line 178

I am guessing this maybe because I am not using locks? I am unsure whether to use locks within a Parallel.ForEach loop.

CodePudding user response:

According to another question on stackoverflow, here are the potential causes for An existing connection was forcibly closed by the remote host. :

  • You are sending malformed data to the application (which could include sending an HTTPS request to an HTTP server)
  • The network link between the client and server is going down for some reason
  • You have triggered a bug in the third-party application that caused it to crash
  • The third-party application has exhausted system resources

Since only some of your requests are affected, I think we can exclude the first one. This can be, of course, a network issue, and in that case, this would happend from time to time depending on the quality of the netwok between you and the server.

Unless you find indication of an AzureStorage's bug from other users, there is a high probability your call are consuming too much of the remote server's resources (connections/data) at the same time. Servers and proxy have limitation on how much connections they can handle at the same time (especially from the same client machine).

Depending on the size of your listSkills list, your code may launch a big number of request in parallel (as much as your thread pool can), possibly flooding the server.

You could at least limit the number of parallel task launch using MaxDegreeOfParallelism like this :

Parallel.ForEach(listSkills,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    item =>
    {
         // some business logic
         var b = getImageFromUrl(item.Url);
         Stream ms = new MemoryStream(b);
    
         saveImage(ms);
         // more business logic
    });

CodePudding user response:

You can control parallelism like:

listSkills.AsParallel()
                .Select(item => {/*Your Logic*/ return item})
                .WithDegreeOfParallelism(10)
                .Select(item =>
                  {
                      getImageFromUrl(item.url);
                      saveImage(your_stream);
                      return item;
                  });

But Parallel.ForEach is not good for IO because it's designed for CPU-intensive tasks, if you use it for IO-bound operations specially making web requests you may waste thread pool thread blocked while waiting for response.

You use asynchronous web request methods like HttpWebRequest.GetResponseAsync, in the other side you can also use thread synchronization constructs for that, as ab example using Semaphore, the Semaphore is like queue, it allows X threads to pass, and the rest should wait until one of busy threads will finish it's work. First make your getStream method as async like (this is not good solution but can be better):

public static async Task getImageFromUrl(SemaphoreSlim semaphore, string url)
{
    try
    {
        HttpWebRequest request = null; 
        byte[] b = null;
        request = (HttpWebRequest)WebRequest.Create(url);
        using (var response = await request.GetResponseAsync().ConfigureAwait(false))
        {
            // your logic
        } 
    }
    catch (Exception ex)
    {
        // handle exp
    }
    finally
    {
        // release
        semaphore.Release();
    }
}

and then:

using (var semaphore = new SemaphoreSlim(10))
{
    foreach (var url in urls)
    {
        // await here until there is a room for this task
        await semaphore.WaitAsync();
        tasks.Add(getImageFromUrl(semaphore, url));
    }
    // await for the rest of tasks to complete
    await Task.WhenAll(tasks);
}

You should not use the Parallel or Task.Run instead you can have an async handler method like:

public async Task handleResponse(Task<HttpResponseMessage> response)
{
    HttpResponseMessage response = await response;
    //Process your data
}

and then use Task.WhenAll like:

Task[] requests = myList.Select(l => getImageFromUrl(l.Id))
                        .Select(r => handleResponse(r))
                        .ToArray(); 
await Task.WhenAll(requests);

at the end there are several solution for your scenario but forget Parallel.Foreach instead use optimized solution.

CodePudding user response:

There are several problems with this code:

  • Parallel.ForEach is meant for data parallelism, not IO. The code is freezing all CPU cores waiting for IO to complete
  • HttpWebRequest is a wrapper over HttpClient in .NET Core. Using HttpWebRequest is inefficient and far more complex than needed.
  • HttpClient can post retrieve or post stream contents which means there's no reason to load stream contents in memory. HttpClient is thread-safe and meant to be reused too.

There are several ways to execute many IO operations concurrently in .NET Core.

.NET 6

In the current Long-Term-Support version of .NET, .NET 6, this can be done using Parallel.ForEachAsync. Scott Hanselman shows how easy it is to use it for API calls

You can retrieve the data directly with GetBytesAsync :

record CopyRequest(Uri sourceUri,Uri blobUri);
...
var requests=new List<CopyRequest>();
//Load some source/target URLs

var client=new HttpClient();
await Parallel.ForEachAsync(requests,async req=>{
    var bytes=await client.GetBytesAsync(req.sourceUri);
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromByteArrayAsync(bytes, 0, bytes.Length);
});

A better option would be to retrieve the data as a stream and send it directly to the blob :

await Parallel.ForEachAsync(requests,async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
});

HttpCompletionOption.ResponseHeadersRead causes GetAsync to return as soon as the response headers are received, without buffering any of the response data.

.NET 3.1

In older .NET Core versions (which are reaching End-of-Life in a few months) you can use eg an ActionBlock with a Degree-Of-Parallelism greater than 1:

var options=new ExecuteDataflowBlockOptions{ MaxDegreeOfParallelism = 8};

var copyBlock=new ActionBlock<CopyRequest>(async req=>{
    var response=await client.GetAsync(req.sourceUri, 
                       HttpCompletionOption.ResponseHeadersRead);
    using var sourceStream=await response.Content.ReadAsStreamAsync();
    var blob=new CloudAppendBlob(req.targetUri);
    await blob.UploadFromStreamAsync(sourceStream);
}, options);

The block classes in the TPL Dataflow library can be used to construct processing pipelines similar to a shell script pipeline, with each block piping its output to the next block.

  • Related