I have a C# .NET program that uses an external API to process events for real-time stock market data. I use the API callback feature to populate a ConcurrentDictionary
with the data it receives on a stock-by-stock basis.
I have a set of algorithms that each run in a constant loop until a terminal condition is met. They are called like this:
Task.Run(() => ExecutionLoop1());
Task.Run(() => ExecutionLoop2());
...
Task.Run(() => ExecutionLoopN());
Each one of those functions calls SnapTotals():
public void SnapTotals()
{
foreach (KeyValuePair<string, MarketData> kvpMarketData in
new ConcurrentDictionary<string, MarketData>(Handler.MessageEventHandler.Realtime))
{
...
The Handler.MessageEventHandler.Realtime
object is the ConcurrentDictionary
that is updated in real-time by the external API.
At a certain specific point in the day, there is an instant burst of data that comes in from the API. That is the precise time I want my ExecutionLoop()
functions to do some work.
As I've grown the program and added more of those execution loop functions, and grown the number of elements in the ConcurrentDictionary
, the performance of the program as a whole has seriously degraded. Specifically, those ExecutionLoop()
functions all seem to freeze up and take much longer to meet their terminal condition than they should.
I added some logging to all of the functions above, and to the function that updates the ConcurrentDictionary
. From what I can gather, the ExecutionLoop()
functions appear to access the ConcurrentDictionary
so often that they block the API from updating it with real-time data. The loops are dependent on that data to meet their terminal condition so they cannot complete.
I'm stuck trying to figure out a way to re-architect this. I would like for the thread that updates the ConcurrentDictionary
to have a higher priority but the message events are handled from within the external API. I don't know if ConcurrentDictionary
was the right type of data structure to use, or what the alternative could be, because obviously a regular Dictionary would not work here. Or is there a way to "pause" my execution loops for a few milliseconds to allow the market data feed to catch up? Or something else?
CodePudding user response:
Your basic approach is sound except for one fatal flaw: they are all hitting the same dictionary at the same time via iterators, sets, and gets. So you must do one thing: in SnapTotals
you must iterate over a copy of the concurrent dictionary.
When you iterate over Handler.MessageEventHandler.Realtime
or even new ConcurrentDictionary<string, MarketData>(Handler.MessageEventHandler.Realtime)
you are using the ConcurrentDictionary<>
's iterator, which even though is thread-safe, is going to be using the dictionary for the entire period of iteration (including however long it takes to do the processing for each and every entry in the dictionary). That is most likely where the contention occurs.
Making a copy of the dictionary is much faster, so should lower contention.
Change SnapTotals
to
public void SnapTotals()
{
var copy = Handler.MessageEventHandler.Realtime.ToArray();
foreach (var kvpMarketData in copy)
{
...
Now, each ExecutionLoopX
can execute in peace without write-side contention (your API updates) and without read-side contention from the other loops. The write-side can execute without read-side contention as well.
The only "contention" should be for the short duration needed to do each copy.
And by the way, the dictionary copy (an array) is not threadsafe; it's just a plain array, but that is ok because each task is executing in isolation on its own copy.
CodePudding user response:
I think that your main problem is not related to the ConcurrentDictionary
, but to the large number of ExecutionLoopX
methods. Each of these methods saturates a CPU core, and since the methods are more than the cores of your machine, the whole CPU is saturated. My assumption is that if you find a way to limit the degree of parallelism of the ExecutionLoopX
methods to a number smaller than the Environment.ProcessorCount
, your program will behave and perform better. Below is my suggestion for implementing this limitation.
The main obstacle is that currently your ExecutionLoopX
methods are monolithic: they can't be separated to pieces so that they can be parallelized. My suggestion is to change their return type from void
to IEnumerable
, and place a yield return default;
inside the outer loop. This way it will be possible to execute them in steps, with each step being the code from the one yield return
to the next.
Then create a BlockingCollection<IEnumerator>
, and populate it with the enumerators of each of the ExecutionLoopX
methods:
BlockingCollection<IEnumerator> enumerators = new();
enumerators.Add(ExecutionLoop1(data).GetEnumerator());
enumerators.Add(ExecutionLoop2(data).GetEnumerator());
enumerators.Add(ExecutionLoop3(data).GetEnumerator());
enumerators.Add(ExecutionLoop4(data).GetEnumerator());
//...
Now you can parallelize the execution of the methods, by feeding the BlockingCollection
to a Parallel.ForEach
loop. The body of the parallel loop will invoke the MoveNext
method of each enumerator, which will cause a piece of a ExecutionLoopX
method to be executed. Then the enumerator will be added again in the BlockingCollection
, unless the MoveNext
returns false
, which will happen when the associated ExecutionLoopX
has met its terminal condition and has exited. There are a couple more details regarding the buffering and the completion of the BlockingCollection
, that are shown in the example below:
int pendingEnumeratorsCount = enumerators.Count;
var partitioner = Partitioner.Create(enumerators.GetConsumingEnumerable(),
EnumerablePartitionerOptions.NoBuffering);
ParallelOptions options = new()
{
MaxDegreeOfParallelism = Environment.ProcessorCount - 1
};
Parallel.ForEach(partitioner, options, enumerator =>
{
bool moved = false;
try
{
moved = enumerator.MoveNext();
}
finally
{
if (!moved)
{
if (Interlocked.Decrement(ref pendingEnumeratorsCount) == 0)
enumerators.CompleteAdding();
}
}
if (moved) enumerators.Add(enumerator);
});
An online demo of this idea can be found here.
The Environment.ProcessorCount - 1
configuration means one CPU core will be available for other work, like the communication with the external API and the updating of the ConcurrentDictionary
.