Wholly new to multithreading.
I am writing a program which takes as input a vector of objects and an integer for the number of threads to dedicate. The nature of the objects isn't important, only that each has several members that are file paths to large text files. Here's a simplified version:
// Not very important. Reads file, writes new version omitting
// some lines
void proc_file(OBJ obj) {
std::string inFileStr(obj.get_path().c_str());
std::string outFileStr(std::string(obj.get_path().replace_extension("new.txt").c_str()));
std::ifstream inFile(inFileStr);
std::ofstream outFile(outFileStr);
std::string currLine;
while (getline(inFile, currLine)) {
if (currLine.size() == 1 ||
currLine.compare(currLine.length()-5, 5, "thing") != 0) {
outFile << currLine << '\n';
}
else {
for (int i = 0; i < 3; i ) {
getline(inFile, currLine);
}
}
}
inFile.close();
outFile.close();
}
// Processes n file concurrently, working way through
// all OBJ in objs
void multi_file_proc(std::vector<OBJ> objs, int n) {
std::vector<std::thread> procVec;
for (int i = 0; i < objs.size(); i ) {
/*
Ensure that n files are always being processed.
Upon completion of one, initiate another, until
all OBJ in objs have had their text files changed.
*/
}
}
I want to loop through each OBJ and write altered versions of their text files in concurrence, the limitation on simultaneous file read/writes being the thread value (n). Ultimately, all the objects' text files must be changed, but in such a way that there are always n files being processed, to maximize efficiency in concurrence.
Note the vector of threads, procVec. I originally approached this by managing a vector of threads, with a file being processed for each thread in procVec. From my reading, it seems a vector for managing these tasks is logical. But how do I always ensure there are n files open until all have been processed, without exiting with an open thread?
Edit: Apologies, my intention was not to ask others to write code for me. I just didn't want my approach to bias anyone's answer if the approach was bad to begin with.
These are some things I've tried (this code would go into the block comment in my function):
1. First approach. Idea is to add to procVec up until the thread limit n was reached, then join, remove a process from the front of the vector upon its completion. This is a summary of several similar iterations, none of which worked:
if (i >= n) {
procVec.front().join();
procVec.erase(procVec.begin());
}
procVec.push_back(std::thread(proc_file, sra[i]));
Problems with this:
- Incorrectly assumes front of vector will always finish first
- (Possibly?) Invalidates all iterators in procVec after first is erased
2. Using mutexes, I attempt writing a lambda function where the thread would be removed upon its completion. This is my current approach. Unsure why it isn't working, or if it even suits my needs:
// remThread() and lamb() defined above main function, **procVec** and **threadMutex**
//are global variables
void remThread(std::thread::id id) {
std::lock_guard<std::mutex lock(threadMutex);
auto iter = std::find_if(procVec.begin(), procVec.end(), [=](std::thread &t)
{return (t.get_id() == id); });
if (iter != procVec.end()) {
iter->join();
procVec.erase(iter);
}
}
void lamb(SRA sra, std::thread::id id) {
proc_file(sra);
remThread(id);
}
// This is the code contained in the main for loop. called lambda to process file
// and then remove thread
std::lock_guard<std::mutex> lock(threadMutex);
procVec.push_back(std::thread([sras, i]() {
std::thread(lamb, sras[i], std::this_thread::get_id()).detach();
}));
Problems with this:
- Program terminates, likely a joinable thread is active, leaves scope
CodePudding user response:
Given that the example you show is fairly simple, a for loop of fixed size, no strange dependencies, a very simple solution could be to use OpenMP which would allow you to do what you describe (providing I understood correctly) by adding a single line
void multi_file_proc(std::vector<OBJ> objs, int n) {
std::vector<std::thread> procVec;
#pragma omp parallel for num_threads(n) schedule(dynamic, 1)
for (int i = 0; i < objs.size(); i ) {
/*
...
*/
}
}
in front of the for loop. Of course you then have to modify your compile command to add openmp support, the precise flag naturally being different from compiler to compiler i.e. -fopenmp for g , -qopenmp for icpc, etc.
The line above basically instructs the compiler to create code to execute the for loop below in parallel. The important bit here is the last one where we set the schedule. Dynamic simply means that the order is not predetermined, instead threads will get their next iteration when they finish with the last. The integer 1 there defines the number of steps they take at a time, given that each file is large we want something fine grained since we don't expect too much overhead from the scheduling.
A word of caution, OpenMP, like most of C , will not even try to stop you from shooting yourself in the foot. And with concurrency there are whole new ways to do just that.
Finally, this is by no means guaranteed to be the absolute best solution outright. For instance if your files are of varying lengths then you would probably want to sort the objects from longest to shortest before the loop. This way once the last object is being processed (at some point only a single thread will be working on the final object) that won't take too long.