Home > Enterprise >  Threadpool execution stops at an arbitrary point in C
Threadpool execution stops at an arbitrary point in C

Time:12-30

I am implementing my own generic Threadpool algorithm in C, using the fibonacci sequence for test purposes, and for the last few days I have been stuck with a problem that completely manages to baffle me.

When executing the program it will work until at some point it suddenly stops for no reason that is readily apparent for me.

The one thing I noticed is that the execution stops after some small amount of time, as it stops earlier in the execution if print commands or sleep commands are added to it.

EDIT: Missed this part, I already tested for Deadlocks and there are none, it seems to just not push any new things onto the stack at some point, leading to all threads just trying to pull from the stack, recognising it's empty and jumping back up just to repeat that process ad-infinitum.

Here is the code:

threadpool.h

#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED

#include <stddef.h>
#include <stdbool.h>

typedef void (*ThreadTask_f)(void*);

typedef struct Future {
    ThreadTask_f fn;   //Pointer to the to be executed function
    bool fulfilled;
} Future;


extern int tpInit(size_t size);

extern void tpRelease(void);

extern void tpAsync(Future *future);

extern void tpAwait(Future *future);

/* creates an abstraction for easy interaction of functions with the threadpool
 * TYPE: type that the function returns
 * NAME: name of the function to be parralelised
 * ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) \
    TYPE NAME(ARG); \
    \
    typedef struct { \
        Future fut;  \
        ARG    arg;  \
        TYPE   res;  \
    } NAME ## _fut;  \
    \
    static void NAME ## Thunk(void *args) { \
        NAME ## _fut *data = args;          \
        data->res = NAME(data->arg);        \
    } \
    static inline NAME ## _fut NAME ## Future(ARG arg) { \
        return (NAME ## _fut) {                          \
            .fut = { .fn = &NAME ## Thunk, .fulfilled = false },             \
            .arg = arg                                   \
        };                                               \
    } \
    static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { \
        tpAsync(&future->fut);                 \
        return future;                         \
    } \
    static inline TYPE NAME ## Await(NAME ## _fut *future) { \
        tpAwait(&future->fut);        \
        return future->res;           \
    }

#endif

threadpool.c


#include "threadpool.h"

#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>

#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024  //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2  //if the TaskStack is full, multiply by this


typedef struct TaskStack {
    Future **start;
    size_t size;
    long current;
} TaskStack;

typedef struct ThreadPool {
    size_t size;
    pthread_t *threads;
    TaskStack *stack;
} ThreadPool;

static pthread_mutex_t stackAccess;

static ThreadPool *tp;

void nsleep(unsigned long nano) {
    struct timespec delay = {
        .tv_sec = 0,
        .tv_nsec = nano
    };
    nanosleep(&delay, NULL);
}

static void push(Future *future){
    pthread_mutex_lock(&stackAccess);
        if(tp->stack->current  ==tp->stack->size){
            tp->stack->size*=2;
            tp->stack->start=realloc(tp->stack->start, tp->stack->size);
        }
        tp->stack->start[tp->stack->current]=future;
    pthread_mutex_unlock(&stackAccess);
}

static Future *pull(){
    Future *retVal=NULL;
    PULLBEGIN:
    pthread_mutex_lock(&stackAccess);
        if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
            pthread_mutex_unlock(&stackAccess);
            pthread_testcancel();
            sched_yield();
            goto PULLBEGIN;
        }
        retVal=tp->stack->start[tp->stack->current];
        tp->stack->current--;
    pthread_mutex_unlock(&stackAccess);
    return retVal;
}

static void *workerThread(void *args){
    pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

    Future *fut;
    while(true){
        fut=pull();
        fut->fn(fut);
        fut->fulfilled=true;
        pthread_testcancel();
    }

    return NULL;
}

int tpInit(size_t size) {
    
    int err;
    tp=NULL;
    accessStack=0;
    pushExisting=0;
    pthread_mutex_init(&stackAccess, NULL);
    tp=malloc(sizeof(ThreadPool));
    if(tp==NULL){
        err=0;
        goto ERRHANDLINIT;
    }
    tp->size=0;
    tp->stack=malloc(sizeof(TaskStack));
    if(tp->stack==NULL){
        err=1;
        goto ERRHANDLINIT;
    }
    tp->threads=malloc(sizeof(pthread_t)*size);
    if(tp->threads==NULL){
        err=2;
        goto ERRHANDLINIT;
    }
    tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
    if(tp->stack->start==NULL){
        err=3;
        goto ERRHANDLINIT;
    }
    tp->stack->current=-1;
    tp->stack->size=INITSTACKSIZE; 
    pthread_attr_t attributes;
    if(pthread_attr_init(&attributes)!=0){
        err=4;
        goto ERRHANDLINIT;
    }
    if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
        err=5;
        goto ERRHANDLINIT;
    }

    if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
        err=6;
        goto ERRHANDLINIT;
    }

    for(int i=0; i<size;i  ){
        if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
            err=20 i;
            goto ERRHANDLINIT;
        }
    }
    return 0;

    ERRHANDLINIT:
    perror("Problem while initiating the threadpool with the following errcode: ");
    fprintf(stderr,"%i\n", err);
    return -1;
}

void tpRelease(void) {
    for(int i=0; i<tp->size; i  ){
        pthread_cancel(tp->threads[i]);
        pthread_join(tp->threads[i], NULL);
    }
    free(tp->stack->start);
    free(tp->stack);
    free(tp->threads);
    free(tp);
}

void tpAsync(Future *future) {
    future->fulfilled=false;
    push(future);
    return;
}

void tpAwait(Future *future) {
    while(!future->fulfilled){
        Future *workFut=pull();
        workFut->fn(workFut);
        workFut->fulfilled=true;
    }
}

main.c

#include "threadpool.h"

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


static TASK(long, fib, long);

long fib(long n) {
    if (n <= 1){
        return n;
    }

    fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
    fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });

    return fibAwait(a)   fibAwait(b);
}

int main() {
    if (tpInit(8) != 0)
        perror("Thread Pool initialization failed"), exit(-1);
    atexit(&tpRelease);
    
    for (long i = 0; i <= 100;   i)
        printf("fib(%2li) = %li\n", i, fib(i));
    
    return 0;
}

Makefile

#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean

SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool

CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread

DEP = $(OBJ:%.o=%.d)
-include $(DEP)

%.o: %.c
    $(CC) $(CFLAGS) $< -o $@

$(TAR): $(filter-out quicksort.o,$(OBJ))
    $(CC) $(LFLAGS) -o $@ $^

all: $(TAR)

run: all
    ./$(TAR)

clean:
    $(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)

I really hope you have some idea what is happening. Thank you in advance.

CodePudding user response:

So I figured it out, with the gracious help of Craig Estey and Amit (which you can see in the comments under the original post).

So in the end it was a Deadlock, because, as you can still see in the original post which I will not modify so anyone interested has a chance to gaze upon my folly.

This happened because at one point there will be 6 threads waiting to pull, the stack is empty, and the two remaining threads is one going into await, and the other just having fulfilled it's given function, which is one that didn't call another recursively (in our example one with fib(0) or fib(1)). Now that that is finished the thread, let's call it thread 7, going into fib_await() will check if the value that it is waiting for is fulfilled, which at this point it isn't yet, thus it checks if there are any other in the stack. As there are none, it is stuck in wait.

Now the other thread, thread 8, the one that just fulfilled it's given function marks it's future as fulfilled and tries to pull another future. As it is empty, it too will stay in pull.

Now all threads are stuck in pull and none can progress as the one that is waiting on another would first have to leave pull().

My only modifications came for pull(), push(), tpAwait(), tpInit(), and workerThread() as I also implemented a very simple ticket lock.

  • Related