I have a Windows service that reads data from the database and processes this data using multiple REST API calls.
Originally, this service ran on a timer where it would read unprocessed data from the database and process it using multiple threads limited using SemaphoreSlim
. This worked well except that the database read had to wait for all processing to finish before reading again.
ServicePointManager.DefaultConnectionLimit = 10;
Original that works:
// Runs every 5 seconds on a timer
private void ProcessTimer_Elapsed(object sender, ElapsedEventArgs e)
{
var hasLock = false;
try
{
Monitor.TryEnter(timerLock, ref hasLock);
if (hasLock)
{
ProcessNewData();
}
else
{
log.Info("Failed to acquire lock for timer."); // This happens all of the time
}
}
finally
{
if (hasLock)
{
Monitor.Exit(timerLock);
}
}
}
public void ProcessNewData()
{
var unproceesedItems = GetDatabaseItems();
if (unproceesedItems.Count > 0)
{
var downloadTasks = new Task[unproceesedItems.Count];
var maxThreads = new SemaphoreSlim(semaphoreSlimMinMax, semaphoreSlimMinMax); // semaphoreSlimMinMax = 10 is max threads
for (var i = 0; i < unproceesedItems .Count; i )
{
maxThreads.Wait();
var iClosure = i;
downloadTasks[i] =
Task.Run(async () =>
{
try
{
await ProcessItemsAsync(unproceesedItems[iClosure]);
}
catch (Exception ex)
{
// handle exception
}
finally
{
maxThreads.Release();
}
});
}
Task.WaitAll(downloadTasks);
}
}
To improve efficiency, I rewrite the service to run GetDatabaseItems
in a separate thread from the rest so that there is a ConcurrentDictionary
of unprocessed items between them that GetDatabaseItems
fills and ProcessNewData
empties.
The problem is that while 10 unprocessed items are send to ProcessItemsAsync
, they are processed two at a time instead of all 10.
The code inside of ProcessItemsAsync
calls var response = await client.SendAsync(request);
where the delay occurs. All 10 threads make it to this code but come out of it two at a time. None of this code changed between the old version and the new.
Here is the code in the new version that did change:
public void Start()
{
ServicePointManager.DefaultConnectionLimit = maxSimultaneousThreads; // 10
// Start getting unprocessed data
getUnprocessedDataTimer.Interval = getUnprocessedDataInterval; // 5 seconds
getUnprocessedDataTimer.Elapsed = GetUnprocessedData; // writes data into a ConcurrentDictionary
getUnprocessedDataTimer.Start();
cancellationTokenSource = new CancellationTokenSource();
// Create a new thread to process data
Task.Factory.StartNew(() =>
{
try
{
ProcessNewData(cancellationTokenSource.Token);
}
catch (Exception ex)
{
// error handling
}
}, TaskCreationOptions.LongRunning
);
}
private void ProcessNewData(CancellationToken token)
{
// Check if task has been canceled.
while (!token.IsCancellationRequested)
{
if (unprocessedDictionary.Count > 0)
{
try
{
var throttler = new SemaphoreSlim(maxSimultaneousThreads, maxSimultaneousThreads); // maxSimultaneousThreads = 10
var tasks = unprocessedDictionary.Select(async item =>
{
await throttler.WaitAsync(token);
try
{
if (unprocessedDictionary.TryRemove(item.Key, out var item))
{
await ProcessItemsAsync(item);
}
}
catch (Exception ex)
{
// handle error
}
finally
{
throttler.Release();
}
});
Task.WhenAll(tasks);
}
catch (OperationCanceledException)
{
break;
}
}
Thread.Sleep(1000);
}
}
Environment
- .NET Framework 4.7.1
- Windows Server 2016
- Visual Studio 2019
Attempts to fix:
I tried the following with the same bad result (two await client.SendAsync(request)
completing at a time):
- Set Max threads and
ServicePointManager.DefaultConnectionLimit
to 30 - Manually create threads using
Thread.Start()
- Replace async/await pattern with sync HttpClient calls
- Call data processing using
Task.Run(async () =>
andTask.WaitAll(downloadTasks);
- Replace the new long-running thread for
ProcessNewData
with a timer
What I want is to run GetUnprocessedData
and ProcessNewData
concurrently with an HttpClient connection limit of 10 (set in config) so that 10 requests are processed at the same time.
Note: the issue is similar to HttpClient.GetAsync executes only 2 requests at a time? but the DefaultConnectionLimit
is increased and the service runs on a Windows Server. It also creates more than 2 connections when original code runs.
Update
I went back to the original project to make sure it still worked, it did. I added a new timer to perform some unrelated operations and the httpClient
issue came back. I removed the timer, everything worked. I added a new thread to do parallel processing, the problem came back.
CodePudding user response:
This is not a direct answer to your question, but a suggestion for simplifying your service that could make the debugging of any problem easier. My suggestion is to implement the producer-consumer pattern using an iterator for producing the unprocessed items, and a parallel loop for consuming them. Ideally the parallel loop would have async delegates, but since you are targeting the .NET Framework you don't have access to the .NET 6 method Parallel.ForEachAsync
. So I will suggest the slightly wasteful approach of using a synchronous parallel loop that blocks threads. You could use either the Parallel.ForEach
method, or the PLINQ like in the example below:
private IEnumerable<Item> Iterator(CancellationToken token)
{
while (true)
{
Task delayTask = Task.Delay(5000, token);
foreach (Item item in GetDatabaseItems()) yield return item;
delayTask.GetAwaiter().GetResult();
}
}
public void Start()
{
//...
ThreadPool.SetMinThreads(degreeOfParallelism, Environment.ProcessorCount);
new Thread(() =>
{
try
{
Partitioner
.Create(Iterator(token), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.WithDegreeOfParallelism(degreeOfParallelism)
.WithCancellation(token)
.ForAll(item => ProcessItemAsync(item).GetAwaiter().GetResult());
}
catch (OperationCanceledException) { } // Ignore
}).Start();
}
The Iterator
fetches unprocessed items from the database in batches, and yields them one by one. The database won't be hit more frequently than once every 5 seconds.
The PLINQ query is going to fetch a new item from the Iterator
each time it has a worker available, according to the WithDegreeOfParallelism
policy. The setting EnumerablePartitionerOptions.NoBuffering
ensures that it won't try to fetch more items in advance.
The ThreadPool.SetMinThreads
is used in order to boost the availability of ThreadPool
threads, since the PLINQ is going to use lots of them. Without it the ThreadPool
will not be able to satisfy the demand immediately, although it will gradually inject more threads and eventually will catch up. But since you already know how many threads you'll need, you can configure the ThreadPool
from the start.
In case you dislike the idea of blocking threads, you can find a simple substitute of the Parallel.ForEachAsync
here, based on the TPL Dataflow library. It requires installing a NuGet package.
CodePudding user response:
The issue turned out to be the place where ServicePointManager.DefaultConnectionLimit
is set.
In the version where HttpClient
was only doing two requests at a time, ServicePointManager.DefaultConnectionLimit
was being set before the threads were being created but after the HttpClient
was initialized.
Once I moved it into the constructor before the HttpClient
is initialized, everything started working.
Thank you very much to @Theodor Zoulias for the help.
TLDR; Set ServicePointManager.DefaultConnectionLimit
before initializing the HttpClient
.