Home > Enterprise >  threads to read file size in c (Mutex problem)
threads to read file size in c (Mutex problem)

Time:10-06

I have a project from school where we going to make a program with threads that are going to calculate the size of the files in a directory. We need to use lstat to get the block size of the file. My solution for this is to make all the threads, and the main thread is going to go through the directory and add each file that it finds to a queue. The threads can then take a file from the queue and get the block size with lstat. The picture is showing how I was thinking when I made this. enter image description here

The program work's how I was expecting it to do. But the problem I have is that for each thread I add to the program the time that it takes to finish get's higher for each thread I add. So with one thread it takes 5 sec but with 2 threads it takes 7 sec.

I will show some of the code I made. I think I miss how I can use mutex here because the mutex block's the main to add to the queue and all the thread's get blocked when it adding to the queue. So if you see something that I miss with the mutex please tell me :).

/* If -j flag is used and it's more then 1 thread */
if(flag_j == 1 && threads > 1) {

    /* Create each thread */
    createThreads(th, threads);

    /* Make a task of each file and add task to queue */
    makeTask(argv);

    /* Tell that no more file will be added to the queue */
    pthread_mutex_lock(&mutex);
    endThreads = 1;
    pthread_mutex_unlock(&mutex);

    /* Join thread's */
    joinThreads(th, threads);
}

/**
 * Going to create all the thread's that was given
 * with the -j flag at start.
 * @param th An array of all the thread's.
 * @param threads The number of thread's.
 * 
 */
void createThreads(pthread_t th[], int threads) {
    for(int i = 0; i < threads; i  ) {
        int *a = malloc(sizeof(a));
        *a = i;
        /* pthread create */
        if(pthread_create(&th[i], NULL, &getQueue, a) != 0) {
            perror("Failed to create thread");
            exit(EXIT_FAILURE);
        }    
    }
}


/**
 * Each thread will get a task from the queue to execute until
 * the queue is empty and the conditional variable is set to 1.
 * @param arg In this case it's NULL.
 * 
*/
void *getQueue(void *arg) {

    free(arg);
    while(1) {
        Task task;
    
        /* Lock */
        pthread_mutex_lock(&mutex);
        if(queueCount > 0) {
            /* Get the first task in queue */
            task = queue[0];
            /* Move all the task in the queue forward one step */
            for (int i = 0; i < queueCount - 1; i  )
            {
                queue[i] = queue[i   1];
            }
            queueCount--;
            /* execute the task */
            pthread_mutex_unlock(&mutex);
            execute(&task);
        } else if ( endThreads == 1) {
            pthread_mutex_unlock(&mutex);
            break;
        } else {
            pthread_mutex_unlock(&mutex);
        }
    
    }

    /* The thread will just resturn NULL */
    return arg;
}


/**
 * Find the correct struct in the list of arguments.
 * Then add the size of the file to that struct and
 * add the struct back in to the list.
 * @param task Is the task that's need to be executed.
 * 
*/
void execute(Task* task) {

    int i = 0;
    char *path = malloc(1024*sizeof(path)   1);

    /* Get the first name of the path */
    while (task->file[i] != '\0')
    {   
        /* Break when it find's the first "/" */
        if (task->file[i] == '/' && i != 0)
        {
             break;
        }
    
        path[i] = task->file[i];
        i  ;
    }

    path[i] = '\0';


    pthread_mutex_lock(&mutex1);
    for (int j = 0; j < numOfArg; j  )
    {
        Arguments entity;
        /* Get the first struct in listOfArg */
        entity = listOfArg[0];

        /* Move all the struct's one step forward */
        for (int k = 0; k < numOfArg - 1; k  ){
            listOfArg[k] = listOfArg[k   1];
        }
    
        /* Check if the struct name is the same name as the path */
        if (strcmp(path, entity.name) == 0)
        {
            /* Add the size of the file to the struct size */
            entity.size  = getFileSize(task->file);
            /* Add the struct back to the list */
            listOfArg[numOfArg - 1] = entity;
            break;
        } else {
            /* Add the struct back to the list */
            listOfArg[numOfArg - 1] = entity;
        }   
    }

    /* Get the size of the file and add it to the total size */
    totalSize  = getFileSize(task->file);
    pthread_mutex_unlock(&mutex1);
    free(path);
}

/**
 * Will check if the argument is a directory or an file.
 * If it is a directory it's going to open and get the file's.
 * If it is a file it will make a Task and add the file to it
 * and add the task to the queue.
 * @param argv An array of the argument's.
 * 
*/
void makeTask(char *argv[]) {
    while(argv[optind] != NULL) {
        if (isDirectory(argv[optind]))
        {
            filesDir(argv[optind]);
        } else {
            Task t;
            t.file[0] = '\0';
            strcpy(t.file, argv[optind]);
            addTask(t);
        }
    
        optind  ;
    }
}

/**
 * Will add the task to the queue of task for the thread's
 * to execute.
 * @param task The task to add to the queue of task's.
 * 
*/
void addTask(Task task) {
    pthread_mutex_lock(&mutex);
    queue[queueCount] = task;
    queueCount  ;
    pthread_mutex_unlock(&mutex);
}

CodePudding user response:

The comments express valid concerns about which kinds of operations multithreading can speed up and which it can't, but they seem largely to have latched onto one plausible explanation for your observations without considering others. I/O considerations will limit the gains achievable from parallelization, but operating systems implement measures to try to reduce the impacts of slow I/O, such as readahead and data and metadata caching. Therefore, it is not necessarily the case that an I/O bottleneck will be sufficient to completely overcome all speedup from parallel computation in this case. Your classmates' observations cannot be discounted here, though it is not necessarily safe to assume that their programs are correct, either.

In fact, there are multiple other issues with your code that contribute to poor multithreaded performance and scaling, including

  • busy-waiting. When getQueue() checks the queue and discovers it empty, it unlocks the mutex and then immediately attempts to re-acquire it. That re-acquisition has a good chance to succeed despite other threads waiting for the mutex, such as the one that is trying to fill the queue. The more worker threads you add, the harder it is for the main thread, running makeTask(), to queue tasks.

    This sort of problem is usually addressed with the help of a condition variable.

  • Highly inefficient queue implementation. Every time an element is removed from the queue, the whole tail of the queue is moved up so that the head is at index 0. And all of this has to be performed while holding the mutex locked, reducing the proportion of the overall work that can run in parallel. There are multiple better-performing alternatives -- a circular queue or a linked list, for example.

  • The appearance of a second critical region in execute(). It's unclear what all this manipulation of listOfArg is about, but it also limits the proportion of your program that executes in parallel. And it has similar efficiency problems to those involved in manipulating the task queue.

The program also exhibits some general inefficiencies that probably don't make much difference to speedup and scaling, such as additional instances of shifting arrays around one element at a time via loops instead of via memmove(), hand-rolling string operations instead of using built-in, optimized library functions such as strchr(), and performing dynamic memory allocation where automatic allocation would serve better.

  • Related