I'm going to start by describing my use case:
I have built an app which processes LARGE datasets, runs various transformations on them and them spits them out. This process is very time sensitive so a lot of time has gone into optimising.
The idea is to read a bunch of records at a time, process each one on different threads and write the results to file. But instead of writing them to one file, the results are written to one of many temp files which get combined into the desired output file at the end. This is so that we avoid memory write protection exceptions or bottlenecks (as much as possible).
To achieve that, we have an array of 10 fileUtils, 1 of which get passed to a thread as it is initiated. There is a threadCountIterator
which increments at each localInit
, and is reset back to zero when that count reaches 10. That value is what determines which of the fileUtils objects get passed to the record processing object per thread. The idea is that each util class is responsible for collecting and writing to just one of the temp output files.
It's worth nothing that each FileUtils object gathers about 100 records in a member outputBuildString
variable before writing it out, hence having them exist separately and outside of the threading process, where objects lifespan is limited.
The is to more or less evenly disperse the responsability for collecting, storing and then writing the output data across multiple fileUtil objects which means we can write more per second than if we were just writing to one file.
my problem is that this approach results in a Array Out Of Bounds exception as my threadedOutputIterator
jumps above the upper limit value, despite there being code that is supposed to reduce it when this happens:
//by default threadCount = 10
private void ProcessRecords()
{
try
{
Parallel.ForEach(clientInputRecordList, new ParallelOptions { MaxDegreeOfParallelism = threadCount }, LocalInit, ThreadMain, LocalFinally);
}
catch (Exception e)
{
Console.WriteLine("The following error occured: " e);
}
}
private SplitLineParseObject LocalInit()
{
if (threadedOutputIterator >= threadCount)
{
threadedOutputIterator = 0;
}
//still somehow goes above 10, and this is where the excepetion hits since there are only 10 objects in the threadedFileUtils array
SplitLineParseObject splitLineParseUtil = new SplitLineParseObject(parmUtils, ref recCount, ref threadedFileUtils[threadedOutputIterator], ref recordsPassedToFileUtils);
if (threadedOutputIterator<threadCount)
{
threadedOutputIterator ;
}
return splitLineParseUtil;
}
private SplitLineParseObject ThreadMain(ClientInputRecord record, ParallelLoopState state, SplitLineParseObject threadLocalObject)
{
threadLocalObject.clientInputRecord = record;
threadLocalObject.ProcessRecord();
recordsPassedToObject ;
return threadLocalObject;
}
private void LocalFinally(SplitLineParseObject obj)
{
obj = null;
}
As explained in the above comment,it still manages to jump above 10, and this is where the excepetion hits since there are only 10 objects in the threadedFileUtils array. I understand that this is because multiple threads would be incrementing that number at the same time before either of the code in those if statements could be called, meaning theres still the chance it will fail in its current state.
How could I better approach this such that I avoid that exception, while still being able to take advantage of the read, store and write efficiency that having multiple fileUtils gives me?
Thanks!
CodePudding user response:
But instead of writing them to one file, the results are written to one of many temp files which get combined into the desired output file at the end
That is probably not a great idea. If you can fit the data in memory it is most likely better to keep it in memory, or do the merging of data concurrently with the production of data.
To achieve that, we have an array of 10 fileUtils, 1 of which get passed to a thread as it is initiated. There is a threadCountIterator which increments at each localInit, and is reset back to zero when that count reaches 10
This does not sound safe to me. The parallel loop should guarantee that no more than 10 threads should run concurrently (if that is your limit), and that local init will run once for each thread that is used. As far as I know it makes no guarantee that no more than 10 threads will be used in total, so it seem possible that thread #0 and thread #10 could run concurrently.
The correct usage would be to create a new fileUtils
-object in the localInit.
This more or less works and ends up being more efficient than if we are writing to just one file
Are you sure? typically IO does not scale very well with concurrency. While SSDs are absolutely better than HDDs, both tend to work best with sequential IO.
How could I better approach this?
My approach would be to use a single writing thread, and a blockingCollection as a thread-safe buffer between the producers and the writer. This assumes that the order of items is not significant:
public async Task ProcessAndWriteItems(List<int> myItems)
{
// BlockingCollection uses a concurrentQueue by default
// Can also set a max size , in case the writer cannot keep up with the producers
var writeQueue = new BlockingCollection<string>();
var writeTask = Task.Run(() => Writer(writeQueue));
Parallel.ForEach(
myItems,
item =>
{
writeQueue.Add(item.ToString());
});
writeQueue.CompleteAdding(); // signal the writer to stop once all items has been processed
await writeTask;
}
private void Writer(BlockingCollection<string> queue)
{
using var stream = new StreamWriter(myFilePath);
foreach (var line in queue.GetConsumingEnumerable())
{
stream.WriteLine(line);
}
}
There is also dataflow that should be suitable for tasks like this. But I have not used it, so I cannot provide specific recommendations.
Note that multi threaded programming is difficult. While it can be made easier by proper use of modern programming techniques, you still need need to know a fair bit about thread safety to understand the problems, and what options and tools exist to solve them. You will not always be so lucky to get actual exceptions, a more typical result of multi threading bugs would be that your program just produces the wrong result. If you are unlucky this only occur in production, on a full moon, and only when processing important data.
CodePudding user response:
LocalInit
obviously is not thread safe, so when invoked multiple times in parallel it will have all the multithreading problems caused by not-atomic operations. As a quick fix you can lock the whole method:
private object locker = new object();
private SplitLineParseObject LocalInit()
{
lock (locker)
{
if (threadedOutputIterator >= threadCount)
{
threadedOutputIterator = 0;
}
SplitLineParseObject splitLineParseUtil = new SplitLineParseObject(parmUtils, ref recCount,
ref threadedFileUtils[threadedOutputIterator], ref recordsPassedToFileUtils);
if (threadedOutputIterator < threadCount)
{
threadedOutputIterator ;
}
return splitLineParseUtil;
}
}
Or maybe try to workaround with Interlocked
for more fine-grained control and better performance (but it would not be very easy).
Note that there is still no guarantee that all previous writes are actually finished i.e. for 10 files there is a possibility that the one with 0
index is not yet finished while next 9 are and the 10
th will try writing to the same file as 0
th is writing too. Possibly you should consider another approach (if you still want to write to multiple files, though IO does not usually scale that well, so possibly just blocking write with queue is a way to go). You can consider splitting your data in chunks and process every one of them in parallel (i.e. "thread" per chunk) while every chunk writes to it's own file, so there is no need for sync.
Some potentially useful reading: