I have a task - to write a multithreaded webcrawler (actually I have a local set.html files that I need to parse and move to another directory). The main condition for this task is to make it possible to enter an arbitrary number of threads and determine at what number the program will stop adding in performance.
#include <iostream>
#include <fstream>
#include <thread>
#include <mutex>
#include <queue>
#include <ctime>
#include <set>
#include <chrono>
#include <atomic>
using namespace std;
class WebCrawler{
private:
const string start_path = "/";
const string end_path = "/";
int thread_counts;
string home_page;
queue<string> to_visit;
set<string> visited;
vector<thread> threads;
mutex mt1;
int count;
public:
WebCrawler(int thread_counts_, string root_)
:thread_counts(thread_counts_), home_page(root_) {
to_visit.push(root_);
visited.insert(root_);
count = 0;
}
void crawler(){
for(int i = 0; i<thread_counts; i )
threads.push_back(thread(&WebCrawler::start_crawl, this));
for(auto &th: threads)
th.join();
cout<<"Count: "<<count<<endl;
}
void parse_html(string page_){
cout<<"Thread: "<<this_thread::get_id()<<" page: "<<page_<< endl;
ifstream page;
page.open(start_path page_, ios::in);
string tmp;
getline(page, tmp);
page.close();
for(int i = 0; i<tmp.size(); i ){
if( tmp[i] == '<'){
string tmp_num ="";
while(tmp[i]!= '>'){
if(isdigit(tmp[i]))
tmp_num =tmp[i];
i ;
}
tmp_num = ".html";
if((visited.find(tmp_num) == visited.end())){
mt1.lock();
to_visit.push(tmp_num);
visited.insert(tmp_num);
mt1.unlock();
}
}
}
}
void move(string page_){
mt1.lock();
count ;
ofstream page;
page.open(end_path page_, ios::out);
page.close();
mt1.unlock();
}
void start_crawl(){
cout<<"Thread started: "<<this_thread::get_id()<< endl;
string page;
while(!to_visit.empty()){
mt1.lock();
page = to_visit.front();
to_visit.pop();
mt1.unlock();
parse_html(page);
move(page);
}
}
};
int main(int argc, char const *argv\[])
{
int start_time = clock();
WebCrawler crawler(7, "0.html");
crawler.crawler();
int end_time = clock();
cout<<"Time: "<<(float)(end_time -start_time)/CLOCKS_PER_SEC<<endl;
cout<<thread::hardware_concurrency()<<endl;
return 0;
}
- 1 thread = Time: 0.709504
- 2 thread = Time: 0.668037
- 4 thread = Time: 0.762967
- 7 thread = Time: 0.781821
I've been trying to figure out for a week why my program is running slower even on two threads. I probably don't fully understand how mutex works, or perhaps the speed is lost during the joining of threads. Do you have any ideas how to fix it?
CodePudding user response:
I think there's a bottle neck on your move function. Each thread takes the same amount of time to go through that function. You could start with that
CodePudding user response:
There are many ways to protect things in multithreading, implicit or explicit.
In addition to the totally untested code, there are also some implicit assumptions, for example of that int
is large enough for your task, that must be considered.
Lets make a short analysis of what is needing protection.
- Variables that are accessed from multiple threads
- things that are
const
can be excluded- unless you const cast them
- part of them are mutable
- things that are
- global objects like files or cout
- could be overwritten
- written from multiple threads
- streams have their own internal locks
- so you can write to a stream from multiple threads to cout
- but you don't want it for the files in this case.
- if multiple threads want to open the same file, you will get an error.
std::endl
forces an synchronization, so change it to "\n" like a commenter noted.
- streams have their own internal locks
So this boils down to:
queue<string> to_visit;
set<string> visited; // should be renamed visiting
int count;
<streams and files>
count is easy
std::atomic<int> count;
The files are implicit protected by your visited/visiting check, so they are good too. So the mutex in move can be removed.
The remaining needs an mutex each as they could be independently updated.
mutex mutTovisit, // formerly known as mut1.
mutVisiting.
Now we have the problem that we could deadlock with two mutexes, if we try to lock in different order in two places. You need to read up on all the lock stuff if you add more locks, scoped_lock and lock are good places to start.
Changing the code to
{
scoped_lock visitLock(mutVisiting); // unlocks at end of } even if stuff throws
if((visited.find(tmp_num) == visited.end())){
scoped_lock toLock(mutTo);
to_visit.push(tmp_num);
visited.insert(tmp_num);
}
}
And in this code there are multiple errors, that are hidden by the not thread safe access to to_visit and the randomness of the thread starts.
while(!to_visit.empty()){ // 2. now the next thread starts and sees its empty and stops
// 3. or worse it starts then hang at lock
mt1.lock();
page = to_visit.front(); // 4. and does things that are not good here with an empty to_visit
to_visit.pop(); // 1. is now empty after reading root
mt1.unlock();
parse_html(page);
move(page);
}
To solve this you need an (atomic?) counter, found(Pages) of current known unvisited pages so we know if are done. Then to start threads when there is new work that needs to be done we can use std::condition_variable(_any)
The general idea of the plan is to have the threads wait
until work is available, then each time a new page is discovered notify_one to start work.
To Startup, set the found to 1 and notify_one
once the threads have started, when a thread is done with the work decrease found. To stop when found is zero, the thread that decrease it to zero notify_all
so they all can stop.
What you will find is that if the data is on a single slow disk, it is unlikely you will see much effect from more than 2 threads, if all files are currently cached in ram, you might see more effect.