Home > Net >  How to run my thread in parallel of while loop
How to run my thread in parallel of while loop

Time:09-17

Here is some code which increments several chronometers in parallel:

main.cpp
using namespace std;
 
#include <stdio.h>
#include <time.h>
#include <iostream>
#include <math.h>
#include <cstdlib>
#include <unistd.h>
 
#include <iostream>
#include <sstream>
#include <thread>
#include <vector>
#include <future>
 
#include "mychrono.hpp"

 
int main()
 
{
     
    std::vector<Chronometer*> car_crono;
    Chronometer chrono, output_chrono;
    std::vector<std::thread> threads;
    std::vector<std::future<Chronometer&>> futures;
 
    std::thread th;
    //future<Chronometer> ft;

    
    for(int i = 0; i < 2; i  )
    {
        car_crono.push_back(new Chronometer);
    }
    

 
 
    while (1) {
        
 
        for(int i = 0; i<2; i  )
            {
//
//                    //threads.push_back(std::thread(&Chronometer::start_chrono, car_crono[i], std::ref(chrono)));
//                    auto ft = std::async(std::launch::async, &Chronometer::start_chrono, car_crono[i], std::ref(chrono));
//
//                std::cout << "Hello-world" << std::endl;
                futures.emplace_back(std::async(std::launch::async, &Chronometer::start_chrono, car_crono[i], std::ref(chrono)));
                

            }
        
    
        std::cout << "hello-world" << std::endl;
        
        
        
        //auto ft = std::async(std::launch::async, &Chronometer::start_chrono, car_crono[0], std::ref(chrono));
        //std::cout << "Hello-world-2" << std::endl;
        
        for(auto&& f: futures){
                std::cout << f.get() << '\n';
        }
    }
    
    car_crono.clear();
    
}
mychrono.cpp
#include "mychrono.hpp"
 
#include <time.h>
#include <iostream>
#include <cstdlib>
#include <unistd.h>
 
#include <sstream>
#include <thread>
 
 
//int Chronometer::hour(0), min(0), sec(0);
 
Chronometer::Chronometer() : hour(0), min(0), sec(0)
{
 
}
 
Chronometer& Chronometer::start_chrono(Chronometer& chrono)
{
  
  // if(chrono.hour == 0 && chrono.min == 0 && chrono.sec == 0)
  // {
    bool condition = true;
    while(condition) {
      sleep(1);
      chrono.sec  ;
 
      if(chrono.sec > 59) {
        chrono.min  ;
        chrono.sec = 0;
 
      }
 
      if(chrono.min > 59) {
        chrono.hour  ;
        chrono.sec = 0;
        chrono.min = 0;
      }
//      if(chrono.sec == 10)
//      {
//        condition = false;
//      }
        
 
      std::cout << "chrono: " << chrono << std::endl;
 
   }
    
    return chrono;
 
  //}
 
  
}
 
 
Chronometer& Chronometer::finish_chrono(Chronometer& chrono)
{
    chrono.hour = 0;
    chrono.sec = 0;
    chrono.min = 0;
 
    return chrono;
}
 
 
std::ostream& operator<<(std::ostream& flux, Chronometer t)
{
    flux << t.hour << ":" << t.min << ":" << t.sec;
    return flux;
}
 
Chronometer& Chronometer::operator=(const Chronometer& other)
{
    // Guard self assignment
    //if (this == &other)
    return *this;
}
 
Chronometer::~Chronometer(){}

mychrono.hpp
#include <time.h>
#include <iostream>
#include <sstream>
 
#ifndef mychrono_hpp
#define mychrono_hpp
 
class Chronometer
{
    private:
        int hour, min, sec;
        //std::stringstream ss;
        //Chronometer chrono;
 
    public:
        
 
        Chronometer();
        Chronometer& start_chrono(Chronometer& chrono);
        Chronometer& finish_chrono(Chronometer& chrono);
        friend std::ostream& operator<<(std::ostream& flux, Chronometer t);
        Chronometer& operator=(const Chronometer& other);
        ~Chronometer();
 
};
 
 
#endif

My program runs well my two chronometers in parallel each other but still dependant of my while loop. For example here I will print "hello-world" once but need to wait my threads stop to print a second "hello-world" message in my while loop.

My question is how to make my threads runs in parallel an be completely independant of others instructions in my while loop ?

CodePudding user response:

Tzig had a similar idea as mine, that is using condition variables and such. I've made a full working example including comments and hopefully written for readability.

    #include <chrono>
    #include <iostream>
    #include <iomanip>
    #include <mutex>
    #include <future>
    #include <condition_variable>

    //-----------------------------------------------------------------------------------------------------
    // state of the timer.

    enum class State
    {
        idle,
        starting,
        running,
        stopping,
        stopped
    };

    //-----------------------------------------------------------------------------------------------------
    // helper class for use of std::condition_variable, makes code more readable
    // takes into account the pitfalls of condition variables : 
    // https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables

    template<typename T>
    class StateVariable
    {
    public:
        StateVariable() = delete;
        StateVariable(const StateVariable&) = delete;
        StateVariable(StateVariable&&) = delete;
        StateVariable& operator=(const StateVariable&) = delete;

        explicit StateVariable(const T& value) :
            m_value{ value }
        {
        }

        void operator=(const T& value) noexcept
        {
            {
                std::unique_lock<std::mutex> lock(m_value_mutex);
                m_value = value;
            }
            m_value_changed.notify_all();
        }

        // atomic check and set
        T set_if(const T& from_value, const T& to_value) noexcept
        {
            {
                std::unique_lock<std::mutex> lock(m_value_mutex);
                if (m_value != from_value) return from_value;
                m_value = to_value;
            }
            m_value_changed.notify_all();
            return to_value;
        }

        const bool try_wait_for(const T& value, const std::chrono::steady_clock::duration& duration) const noexcept
        {
            auto pred = [this, value] { return (m_value == value); };
            std::unique_lock<std::mutex> lock(m_value_mutex);
            if (pred()) return true; 
            return m_value_changed.wait_for(lock, duration, pred);
        }

        void wait_for(const T& value) const
        {
            try_wait_for(value, std::chrono::steady_clock::duration::max());
        }

    private:
        // mutables so I could make the const promises on wait 
        // that they wont change the observable state (m_value)
        // of this class.
        mutable std::mutex m_value_mutex; 
        mutable std::condition_variable m_value_changed;
        std::atomic<T> m_value;
    };

    //-----------------------------------------------------------------------------------------------------
    // helper class for storing elapsed time, helps with readability later on 

    class ElapsedTime
    {
    public:

        explicit ElapsedTime(const std::chrono::steady_clock::duration& duration) :
            m_duration{ duration }
        {
        }

        auto hours() const
        {
            return std::chrono::duration_cast<std::chrono::hours>(m_duration).count();
        }

        auto minutes() const
        {
            return (std::chrono::duration_cast<std::chrono::minutes>(m_duration).count() % 60);
        }

        auto seconds() const
        {
            return (std::chrono::duration_cast<std::chrono::seconds>(m_duration).count() % 60);
        }

    private:
        std::chrono::steady_clock::duration m_duration;
    };

    //-----------------------------------------------------------------------------------------------------
    // formatter for ElapsedTime

    std::ostream& operator<<(std::ostream& os, const ElapsedTime& t)
    {
        os << std::setfill('0') << std::setw(2) << t.hours() << ':';
        os << std::setfill('0') << std::setw(2) << t.minutes() << ':';
        os << std::setfill('0') << std::setw(2) << t.seconds();
        return os;
    }

    //-----------------------------------------------------------------------------------------------------
    // ChronoMeter class
    // note I use std::chrono classes

    class ChronoMeter final
    {
    public:
        ChronoMeter() :
            m_state{ State::idle },
            m_duration{ std::chrono::steady_clock::duration::min() }
        {
        };

        ChronoMeter(const ChronoMeter&) = delete;
        ChronoMeter(ChronoMeter&&) = delete;
        ChronoMeter& operator=(const ChronoMeter&) = delete;

        void Start()
        {
            m_start_time = std::chrono::steady_clock::now();
        
            // exercise for the reader, also allow stopped Chronometers to be restarted.
            // for now just this simple state model
            if (m_state.set_if(State::idle, State::starting) != State::starting)
            {
                throw std::runtime_error("only an idle ChronoMeter can be started");
            }

            // it is okay to capture "this" because the destructor of the 
            // chronometer synchronizes with termination of this thread through the future
            m_future = std::async(std::launch::async, [this]
            {
                // Set indication that the thread has really started.
                // this is important because when std::async returns, this thread exists
                // but may not have been scheduled yet.
                m_state = State::running;
            
                do
                {
                    // assigning a value to m_duration isn't atomic so protect it.
                    // we might be reading the value on another thread which might
                    // result in reading an intermediate state.
                    std::scoped_lock<std::mutex> lock{ m_data_mtx };
                    m_duration = std::chrono::steady_clock::now() - m_start_time;

                    // using a statevariable to check for stopping means it can respond 
                    // during the one second delay and stop immediately. 
                    // this is an advantage over using sleep
                } while (!m_state.try_wait_for(State::stopping, std::chrono::seconds(1)));

                m_state = State::stopped;
            });

            // Wait for the thread to have really started
            // this way we have a clear post condition for start
            m_state.wait_for(State::running);
        }

        void Stop()
        {
            // only allow a running Chronometer to be stopped.
            // in all other states Stop does nothing
            if (m_state.set_if(State::running, State::stopping) == State::stopping)
            {
                // synchronization with stopped state, as set by other thread
                m_state.wait_for(State::stopped);

                // future get is not really needed for synchronization.
                // but if thread threw an exception it's rethrown here 
                m_future.get();
            }
        }

        ~ChronoMeter()
        {
            // Automatically stop thread if this hasn't already happened.
            Stop();
        }

        const ElapsedTime Elapsed() const
        {
            std::scoped_lock<std::mutex> lock{ m_data_mtx };
            return ElapsedTime{ m_duration };
        }

    private:
        std::future<void> m_future;
        StateVariable<State> m_state;
        mutable std::mutex m_data_mtx;
        std::chrono::steady_clock::time_point m_start_time;
        std::chrono::steady_clock::duration m_duration;
    };


    int main()
    {
        ChronoMeter meter1;
        ChronoMeter meter2;
    
        meter1.Start();
        std::this_thread::sleep_for(std::chrono::seconds(5));

        auto elapsed_1_1 = meter1.Elapsed();
        std::cout << "Meter 1 elapsed time " << elapsed_1_1 << std::endl;


        meter2.Start();
        std::this_thread::sleep_for(std::chrono::seconds(4));

        auto elapsed_1_2 = meter1.Elapsed();
        auto elapsed_2 = meter2.Elapsed();

        std::cout << "Meter 1 elapsed time " << elapsed_1_2 << std::endl;
        std::cout << "Meter 2 elapsed time " << elapsed_2 << std::endl;

        meter1.Stop();
        // not stopping meter2 (and it's thread) explicitly, this is done safely by destructor if needed

        return 0;
    }

CodePudding user response:

I usually solve this problem by making the multithreaded object handle everything that has to do with multithreading, here's how I solved it in your case (I ended up rewriting a lot of things so maybe the behavior isn't exactly the one you want, you can use my code as a starting point):

main.cpp:

#include <iostream>
#include <vector>

#include "mychrono.hpp"

int main()
{
    std::vector<Chronometer*> car_crono;

    for(int i = 0; i < 2; i  )
    {
        car_crono.push_back(new Chronometer);
    }

    while (1) {
//        std::cout << "hello-world" << std::endl;
        Chronometer::Time t = car_crono[0]->get_time();

        if(t.sec >= 10)
            car_crono[0]->reset_chrono();

        std::cout << "Seconds of T0: " << t.sec << std::endl;
        std::cout << "T1: " << car_crono[1]->to_string() << std::endl;
    }

    car_crono.clear();
}

mychrono.hpp:

#ifndef mychrono_hpp
#define mychrono_hpp

#include <iostream>
#include <thread>
#include <memory>
#include <condition_variable>
#include <mutex>
#include <atomic>
 
class Chronometer
{
    public:
        struct Time {
            int hour;
            int min;
            int sec;
        };

        Chronometer();
        void reset_chrono();
        friend std::ostream& operator<<(std::ostream& flux, Chronometer& t);
        Chronometer& operator=(const Chronometer& other);
        std::string to_string();
        Time get_time();
        ~Chronometer();

     private:
        Time currentTime;
        std::mutex timeMutex;
        std::condition_variable conditionVariable;
        std::unique_ptr<std::thread> thread;
        std::mutex CVMutex;
        std::atomic<bool> exitNow;

        void thread_function();
};
 
#endif

mychrono.cpp:

#include "mychrono.hpp"

Chronometer::Chronometer() : currentTime.hour(0), currentTime.min(0), currentTime.sec(0)
{
    thread.reset(new std::thread(&Chronometer::thread_function, this));
}

void Chronometer::reset_chrono()
{
    std::lock_guard<std::mutex> lock(timeMutex);

    currentTime.hour = 0;
    currentTime.sec = 0;
    currentTime.min = 0;
}

std::ostream& operator<<(std::ostream& flux, Chronometer& t)
{
    flux << t.to_string();
    return flux;
}

Chronometer& Chronometer::operator=(const Chronometer& other)
{
    // Guard self assignment
    //if (this == &other)
    return *this;
}

std::string Chronometer::to_string()
{
    std::lock_guard<std::mutex> lock(timeMutex);

    return std::to_string(currentTime.hour)   ":"   std::to_string(currentTime.min)   ":"   std::to_string(currentTime.sec);
}

Time Chronometer::get_time()
{
    return currentTime;
}
 
Chronometer::~Chronometer()
{
    exitNow = true;
    
    {
        std::unique_lock<std::mutex> lock(CVMutex);

        lock.unlock();
        conditionVariable.notify_all();
    }

    thread->join();
}

void Chronometer::thread_function()
{
    std::unique_lock<std::mutex> waitLock(CVMutex);

    while(!exitNow)
    {
        sec  ;
 
        if(currentTime.sec > 59) {
            std::lock_guard<std::mutex> lock(timeMutex);

            currentTime.min  ;
            currentTime.sec = 0;
        }
 
        if(currentTime.min > 59) {
            std::lock_guard<std::mutex> lock(timeMutex);

            currentTime.hour  ;
            currentTime.sec = 0;
            currentTime.min = 0;
        }

//        std::cout << "chrono: " << *this << std::endl; //Not thread safe be careful

        conditionVariable.wait_for(waitLock, std::chrono::seconds(1));
    }
}

EDIT: About your latest comment: you don't need to reset a chrono in its destructor as the data will be destroyed anyway. If you want to reset the counter while it's running you want to call Chronometer::reset_chrono() from you main function.

For the second part of your comment, I added a get_time function to the code (I also added a mutex to avoid data races, I completly forgot when I wrote the original answer). When you want to get the current time of a chrono from the main function you just call get_time() and use the struct it returns to get the info you want.

I added a small example to show how to use both functions. As you can see, the main function doesn't even need to know what threads are !

I may be wrong but from the questions you ask I feel maybe you're not used to how multithreading works. It's a very difficult concept and one of the few I feel you can't learn only through experience, if that's the case you might want to learn about it from dedicated sites such as this one. I think I pieced together that you speak french, here's a really good article (that was never finished apparently) about the theory of it and another one in french, more about the specifics of C . If you understand the core concepts and just have a hard time with my code, I plan on commenting it all but for now Pepijn Kramer did a great job explaining what they did in their response.

  • Related