I'm trying to write a multi-threaded code that performs the sum of the elements of a vector.
The code is very simple:
- The threads are defined through a vector of threads;
- The number of threads is defined by the ThreadsSize variable;
- Using ThreadsSize equal to 1, the sum is performed in about 300ms, while using 8 threads in 1200ms.
I'm using an HW with 8 cores.
Can someone explain why this happens? Theroetically I would expect to have 300/8 ms in case 8 threads are used. Is it not correct?
Here the code:
#include <iostream>
#include "math.h"
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
/* Parallel sum function */
void Function_Sum(mutex& Mutex, vector<double>& Vector, int unsigned kin, int unsigned kend, double& Sum)
{
for(int unsigned k =kin; k <= kend; k = k 1)
{
Mutex.lock();
Sum = Sum Vector[k];
Mutex.unlock();
}
}
/* Main function */
int main(int argc, char *argv[])
{
// Threads and mutex initialization
int unsigned ThreadsSize = 1;
vector<thread> Threads;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000,1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize << ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for(int unsigned k = 0; k <= ThreadsSize - 1; k = k 1)
{
kin = k * dk;
kend = (k 1) * dk - 1;
if(k == ThreadsSize - 1)
{
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Threads.push_back(thread(Function_Sum, ref(Mutex), ref(Vector), kin, kend, ref(Sum)));
}
// Threads joining
for(int unsigned k = 0; k <= ThreadsSize - 1 ; k = k 1)
{
Threads[k].join();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
}
Thanks in advance.
CodePudding user response:
Can someone explain why this happens? Theoretically I would expect to have 300/8 ms in case 8 threads are used. Is it not correct?
Theoretically, you could get something that is close to 300/8
(plus the overhead of the threads)
But your way of using the mutex completely prevents any parallelization.
Mutex.lock();
Sum = Sum Vector[k];
Mutex.unlock();
What you do here is:
- request a lock so that any other thread has to wait until the mutex is unlocked again
- do
Sum = Sum Vector[k];
- release the lock so that another thread can gain the lock
So none of the Sum = Sum Vector[k];
are done in parallel, and you now have your original 300ms and in addition the overhead of the mutex handling.
What you want to do is to partition your array into 8 parts, sum up those partitions in parallel using an own storage for each thread, and then sum up the results of the 8 threads.
For that divide and conquer approach you actually don't even need a mutex.
You just need a container that has a size of 8
in which you can store the result of each thread.
And after you "join" loop you can iterate over that container and sum the individual sums up.
Or alternatively, you could use std::future
So your code would look something like this:
#include "math.h"
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
/* Parallel sum function */
double Function_Sum(vector<double> &Vector, int unsigned kin,
int unsigned kend) {
double Sum = 0;
for (int unsigned k = kin; k <= kend; k = k 1) {
Sum = Sum Vector[k];
}
return Sum;
}
/* Main function */
int main(int argc, char *argv[]) {
// Threads and mutex initialization
int unsigned ThreadsSize = 8;
vector<std::future<double>> Futures;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000, 1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize
<< ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k 1) {
kin = k * dk;
kend = (k 1) * dk - 1;
if (k == ThreadsSize - 1) {
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Futures.push_back(std::async(std::launch::async, [&Vector, kin, kend]() {
return Function_Sum(Vector, kin, kend);
}));
}
// Threads joining
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k 1) {
Sum = Futures[k].get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
return 0;
}
Besides that, you in general want to utilize other std functionalities like std::accumulate
, range-based loops, and iterators. And you shouldn't use using namespace std
.
With that your code could look like this:
#include <chrono>
#include <cmath>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>
int main() {
unsigned int ThreadsSize = 8;
std::vector<std::future<double>> Futures;
// Vector definition
std::vector<double> Vector(10000000,1);
// Indexes initialization
unsigned int dk = std::floor(Vector.size() / ThreadsSize);
// Output 1
std::cout << "VectorSize = " << Vector.size()
<< ", ThreadsSize = " << ThreadsSize << ", dk = " << dk
<< std::endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
auto currentIterator = Vector.begin();
for (unsigned int k = 0; k < ThreadsSize; k ) {
// save a copy of the current iterator
auto endIterator = currentIterator;
if (k == ThreadsSize - 1) {
// use the actual end iterator for the last thread
endIterator = Vector.end();
} else {
// advance the end iterator
std::advance(endIterator, dk);
}
// create an async task that returns a future
Futures.push_back(
std::async(std::launch::async, [currentIterator, endIterator]() {
// create the sum over the partition
return std::accumulate(currentIterator, endIterator, 0.0);
}));
currentIterator = endIterator;
}
// collect the results of the futures
double Sum = 0;
for (auto &future : Futures) {
Sum = future.get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
std::cout << "Sum = " << Sum << std::endl;
std::cout << "Time = " << elapsed_time_ms << std::endl;
return 0;
}
CodePudding user response:
This small modification of Function_Sum allows to obtain the speedup you desired:
double sum = 0.;
for(int unsigned k =kin; k <= kend; k = k 1)
sum = Vector[k];
Mutex.lock();
Sum = sum;
Mutex.unlock();
Mutex is now being locked once per thread instead of once per addition. If you want a simple explanation, it's just because locking and unlocking mutex costs considerably more than addition.