Home > Software design >  Read input from threads one by one
Read input from threads one by one

Time:09-29

I have 2 threads in my program which:

  • receive input from keyboard
  • calculate some math
  • output result
DWORD WINAPI F1(LPVOID lpParameter)
{    
    // std::cin >> data
    // calculate
    
    return 0;
}

DWORD WINAPI F2(LPVOID lpParameter)
{
    // std::cin >> data
    // calculate
    
    return 0;
}

int main()
{
    thread_handles[0] = CreateThread(NULL,
                                     0,
                                     F1,
                                     NULL,
                                     0,
                                     NULL);
    assert(thread_handles[0] != NULL);
    
    thread_handles[1] = CreateThread(NULL,
                                     0,
                                     F2,
                                     NULL,
                                     0,
                                     NULL);
    assert(thread_handles[1] != NULL);
    
    WaitForMultipleObjects(THREADS_COUNT,
                           thread_handles,
                           TRUE,                           
                           INFINITE);
 
    CloseHandle(thread_handles[0]);
    CloseHandle(thread_handles[1]);
    
    return 0;
}

But when I run my program they all at the same time read input from terminal. How to fill them one by one and calculate then parallel?

CodePudding user response:

The pattern you are looking for is called producer-consumer

You should read your data into some memory by one thread - producer, then you share this memory with one or multiple threads - consumers. Each consumer performs their calculation.

Such as, the raw WinAPI - Thread Pool API - can be used for implementing consumers instead of reinventing the wheel by grouping threads.

As well, the C standard library can be used for its threading API.

The following example asks the user to input two vectors of integers and then calculate their sum in parallel. Here the main application thread is a producer, where the thread pool, ie group of threads, are the consumers.

#include <windows.h>

#include <numeric>
#include <vector>
#include <iostream>

// those to just for demonstrating C   standard library API for solving same problem
#include <atomic>
#include <thread>

// utility function to read count integers from console user input into vector
void read_ints_to_vec(std::vector<int>& to, std::size_t count)
{
    int v = 0;
    for(std::size_t i=0; i < count; i  ) {
        std::cin >> v;
        to.emplace_back( v );
    }
}

// utility function to pretty print the vector
template<typename iterator_type>
void print_iterable(const iterator_type& begin,const iterator_type& end)
{
    if(begin != end) {
        auto it = begin;
        std::cout << "{ " << *it;
        while(  it != end) {
            std::cout << ", ";
            std::cout << *it;
        }
        std::cout << " } ";
    }
}

// Prints last system error into system error stream in case of any OS error
int print_last_error()
{
    ::DWORD ret =::GetLastError();
    if( NO_ERROR != ret ) {
        wchar_t msg[512];
        ::DWORD len = ::FormatMessageW(
                          FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
                          NULL, ret,
                          MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
                          (::LPWSTR) &msg,
                          256, NULL );
        ::DWORD written;
        if( !::WriteFile( ::GetStdHandle(STD_ERROR_HANDLE), msg, len, &written, nullptr ) ) {
            MessageBoxExW(NULL, msg, NULL, MB_OK | MB_ICONERROR, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT) );
        }
    }
    return ret;
}

// Returns number of system logical CPUs i.e. cores threads
unsigned int hardware_concurrency()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo(&sysinfo);
    return sysinfo.dwNumberOfProcessors;
}

// this is a context i.e. message to be passed from
// producer main thread to consumer
struct vector_sum_context {
    const std::vector<int>* v;
    intmax_t sum;
};

// this is thread pull routine i.e. consumer task calculating sum of integers vector
void CALLBACK sum_vector_routine(PTP_CALLBACK_INSTANCE,PVOID context,PTP_WORK)
{
    auto ctx = static_cast< vector_sum_context* >(context);
    ctx->sum = std::accumulate(ctx->v->cbegin(), ctx->v->cend(), 0);
}

int main(int argc, const char** argv)
{    
    std::vector<int> v0, v1;
    std::cout << "Please input first vector of 4 integers to sum them " << std::endl;
    read_ints_to_vec(v0, 4);
    std::cout << "Please input second vector of 4 integers to sum them " << std::endl;
    read_ints_to_vec(v1, 4);


    /* create thread pull */
    PTP_POOL tp = ::CreateThreadpool(nullptr);
    if(nullptr == tp)
        return print_last_error();
    if( FALSE == ::SetThreadpoolThreadMinimum(tp, 1) ) {
        ::CloseThreadpool(tp);
        return print_last_error();
    }
    SetThreadpoolThreadMaximum(tp, hardware_concurrency());
    PTP_CLEANUP_GROUP cleanup_group = ::CreateThreadpoolCleanupGroup();
    if(nullptr == cleanup_group) {
        CloseThreadpool(tp);
        return print_last_error();
    }

    TP_CALLBACK_ENVIRON cbenv;
    InitializeThreadpoolEnvironment(&cbenv);
    SetThreadpoolCallbackCleanupGroup(&cbenv, cleanup_group, [](PVOID, PVOID) {});

    /* Now submit two tasks into our thread pull */
    vector_sum_context c0 {&v0, 0};
    PTP_WORK work0 = ::CreateThreadpoolWork(sum_vector_routine, &c0, &cbenv);
    SubmitThreadpoolWork(work0);

    vector_sum_context c1 {&v1, 0};
    PTP_WORK work1 = ::CreateThreadpoolWork(sum_vector_routine, &c1, &cbenv);
    SubmitThreadpoolWork(work1);

    /*
      join thread pool
      i.e. wait until all asynchronous operations to accomplish
    */
    CloseThreadpoolCleanupGroupMembers(cleanup_group, FALSE, nullptr);
    /* Clean up the cleanup group and close thread poll  */
    CloseThreadpoolCleanupGroup(cleanup_group);
    DestroyThreadpoolEnvironment(&cbenv);
    CloseThreadpool(tp);

    // Display results
    std::cout << "Sums calculated by windows thread pool" << std::endl;

    std::cout << "Vector 0 ";
    print_iterable(v0.cbegin(), v0.cend());
    std::cout << " sum = " << c0.sum << std::endl;

    std::cout << "Vector 1 ";
    print_iterable(v1.cbegin(), v1.cend());
    std::cout << " sum = " << c1.sum << std::endl;

    // Same thing implemented with C   standard library
    std::atomic_intmax_t sum0 = 0, sum1 = 0;
    std::thread t0( [&v0, &sum0] { sum0.store( std::accumulate(v0.cbegin(), v0.cend(), 0) ); } );
    std::thread t1( [&v1, &sum1] { sum1.store( std::accumulate(v1.cbegin(), v1.cend(), 0) ); } );
    t0.join();
    t1.join();

    std::cout << "Sums calculated by C   thread API" << std::endl;

    std::cout << "Vector 0 ";
    print_iterable(v0.cbegin(), v0.cend());
    std::cout << " sum =" << sum0.load() << std::endl;

    std::cout << "Vector 1 ";
    print_iterable(v1.cbegin(), v1.cend());
    std::cout << " sum =" << sum1.load() << std::endl;

    return 0;
}
  • Related