Home > database >  How to implement and profile pthreads continuously created on the heap?
How to implement and profile pthreads continuously created on the heap?

Time:07-14

Introduction

I have a program where child threads are created that I would like to profile with Valgrind memcheck. From the responses to a previous question I've asked, I will need to use joinable (rather than detached) threads in order to test and profile reliably with Valgrind memcheck.

Stack vs. Heap Allocation

My program is sufficiently large where I don't think I can create the thread and join it in the same scope. For this reason I allocate space for the pthread_t on the heap.

Attempt #1 - Joining Immediately

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <stdint.h>

void my_thread() {
  printf("I'm in a child thread!\n");
  pthread_exit(NULL);
}

pthread_t* const make_thread() {
  pthread* const thread = malloc(sizeof(pthread_t));
  pthread_create(thread, NULL, (void*) &my_thread, NULL));
  return thread;
}

int main() {
  printf("Hello, world!\n");
  uint8_t i;
  for(i = 0; i < 255;   i) {
    pthread_t* const thread_handle = make_thread();
    pthread_join(*thread_handle, NULL);
    free(thread_handle);
  }
  return 0;
}

This seems to make sense, but now I want to extend this example by not joining the thread immediately, and only joining on program exit (say, because these threads may become long-living). IOW the above example kind of defeats the purpose of multithreading.

I want to create threads and only really ever force a join on program exit.

Attempt #2 - Joining at the end

#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <stdint.h>
#include <glib-2.0/glib.h>
#include <unistd.h>

void my_thread() {
  sleep(3);
  printf("I'm in a child thread!\n");
  pthread_exit(NULL);
}

pthread_t* const make_thread() {
  pthread* const thread = malloc(sizeof(pthread_t));
  pthread_create(thread, NULL, (void*) &my_thread, NULL));
  return thread;
}

int main() {
  printf("Hello, world!\n");

  GArray* const thread_handles = g_array_new(TRUE, TRUE, sizeof(pthread*));

  // Important loop
  uint8_t i;
  for(i = 0; i < 255;   i) {
    pthread_t* const thread_handle = make_thread();
    g_array_append_val(thread_handles, thread_handle);
  }

  for(i = 0; i < thread_handles->len;   i) {
    pthread_t* const thread_handle =
        g_array_index(thread_handles, pthread*, i);
    pthread_join(*thread_handle, NULL);
    free(thread_handle);
  }
  g_array_free(thread_handles, TRUE);

  return 0;
}

This is cool but what if "Important loop" is actually endless? How can I prevent thread_handles from expanding until it takes up all available memory?

In the actual program (these are just minimal examples), the program receives network messages and then kicks off threads for some special types on network messages.

CodePudding user response:

So, what is your real issue?

For normal network servers, etc, the usual is to combine both of your approaches.

The main thread has a two nested loops that:

  1. waits for a connection/message.
  2. Creates a thread.
  3. Adds this to the list of active threads.
  4. Loops on all active threads in the list (as below)

Loop on the active thread list:

  1. look for a thread that is marked "done"
  2. Remove it from the list
  3. join it
  4. free the thread struct

The above works pretty much the same if the thread control structs are allocated (via malloc) or come from a fixed, pre-defined array of structs [which can be function scoped to main or global/static scope].


Here's some C-like pseudo code to illustrate:

// task control
typedef struct tsk {
    pthread_t tsk_tid;                      // thread id
    int tsk_sock;                           // socket/message struct/whatever
    int tsk_isdone;                         // 1=done
} tsk_t;

void
my_thread(tsk_t *tsk)
{

    // do stuff ...

    // tell main we're done
    tsk->tsk_isdone = 1;

    return (void *) 0;
}

void
main_loop(void)
{

    while (1) {
        // wait for connection, message, whatever ...
        int sock = accept();

        // create thread to handle request
        tsk_t *tsk = make_thread(sock);

        // enqueue it to list of active threads
        list_enqueue(active_list,tsk);

        // join all completed threads
        while (1) {
            int doneflg = 0;

            // look for completed threads
            for_all(tsk,active_list) {
                if (! tsk->tsk_isdone)
                    continue;

                // remove task from queue/list
                list_remove(active_list,tsk);

                // join the thread
                pthread_join(tsk->tsk_tid,NULL)

                // release storage
                free(tsk);

                // say we reaped/joined at least one thread
                doneflg = 1;
            }

            // stop when we've joined as many threads as we can
            if (! doneflg)
                break;
        }
    }
}

Note that while creating a new thread for a new connection may be reasonable, doing so for messages from a given connection can be very slow.

It may be better to have a pool of worker threads. See my answer: Relative merits between one thread per client and queuing thread models for a threaded server?

  • Related