Home > OS >  Sandard way of implementing c multi-threading for collecting data streams and processing
Sandard way of implementing c multi-threading for collecting data streams and processing

Time:03-31

I'm new to c development. I'm trying to run infinite functions that are independent of each other. Problem statement is smiliar to this:

flow diagram

The way I'm trying to implement this is

#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <mutex>

int g_i = 0;
std::mutex g_i_mutex; // protects g_i

// increment g_i by 1
void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i  = 1;
}

void *fun(void *s)
{
  std::string str;
  str = (char *)s;
  std::cout << str << " start\n";
  while (1)
  {
    std::cout << str << " " << g_i << "\n";
    if(g_i > 1000) break;
    increment_itr();
  }
  pthread_exit(NULL);
  std::cout << str << " end\n";
}

void *checker(void *s) {
  while (1) {
    if(g_i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      pthread_exit(NULL);
    }
  }
}


int main()
{
  int itr = 0;
  pthread_t threads[3];
  pthread_attr_t attr;
  void *status;

  // Initialize and set thread joinable
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  int rc1 = pthread_create(&threads[0], &attr, fun, (void *)&"foo");
  int rc2 = pthread_create(&threads[1], &attr, fun, (void *)&"bar");
  int rc3 = pthread_create(&threads[2], &attr, checker, (void *)&"checker");
  

  if (rc1 || rc2 || rc3)
  {
    std::cout << "Error:unable to create thread," << rc1 << rc2 << rc3 << std::endl;
    exit(-1);
  }

  pthread_attr_destroy(&attr);

  std::cout << "main func continues\n";

  for (int i = 0; i < 3; i  )
  {
    rc1 = pthread_join(threads[i], &status);
    if (rc1)
    {
      std::cout << "Error:unable to join," << rc1 << std::endl;
      exit(-1);
    }
    std::cout << "Main: completed thread id :" << i;
    std::cout << "  exiting with status :" << status << std::endl;
  }

  std::cout << "main end\n";

  return 0;
}

This works, but I want to know if this implementation is a standard approach to do this or this can be done in any better way?

CodePudding user response:

You correctly take a lock inside increment_itr, but your fun function is accessing g_i without acquiring the lock.

Change this:

void increment_itr()
{
  const std::lock_guard<std::mutex> lock(g_i_mutex);
  g_i  = 1;
}

To this

int increment_itr()
{
  std::lock_guard<std::mutex> lock(g_i_mutex);  // the const wasn't actually needed
  g_i = g_i   1;
  return g_i;  // return the updated value of g_i
}

This is not thread safe:

if(g_i > 1000) break;  // access g_i without acquiring the lock
increment_itr();

This this is better:

if (increment_itr() > 1000) {
    break;
}

Similar fix is needed in checker:


void *checker(void *s) {
  while (1) {
    int i;
    {
      std::lock_guard<std::mutex> lock(g_i_mutex);
      i = g_i;
    }
    if(i > 1000) {
      std::cout<<"**********************\n";
      std::cout << "checker: g_i == 100\n";
      std::cout<<"**********************\n";
      break;
    }
    return NULL;
  }

As to your design question. Here's the fundamental issue.

You're proposing a dedicated thread that continuously takes a lock and would does some sort checking on a data structure. And if a certain condition is met, it would do some additional processing such as writing to a database. The thread spinning in an infinite loop would be wasteful if nothing in the data structure (the two maps) has changed. Instead, you only want your integrity check to run when something changes. You can use a condition variable to have the checker thread pause until something actually changes.

Here's a better design.


uint64_t g_data_version = 0;
std::conditional_variable g_cv;

void *fun(void *s)
{
    while (true) {

        << wait for data from the source >> 

        {
            std::lock_guard<std::mutex> lock(g_i_mutex);
            // update the data in the map while under a lock
            // e.g. g_n  ;
            //

            // increment the data version to signal a new revision has been made
            g_data_version  = 1;
        }

        // notify the checker thread that something has changed
        g_cv.notify_all();
    }

}

Then your checker function only wakes up when it fun signals it to say something has changed.

void *checker(void *s) {
  while (1) {

      // lock the mutex
      std::unique_lock<std::mutex> lock(g_i_mutex);

      // do the data comparison check here

      // now wait for the data version to change
      uint64_t version = g_data_version;
      while (version != g_data_version) { // check for spurious wake up
         cv.wait(lock); // this atomically unlocks the mutex and waits for a notify() call on another thread to happen
      }
  }
}

  • Related