Home > Net >  Multi thread parallel counter is slower than the simple concurrent lock based counter
Multi thread parallel counter is slower than the simple concurrent lock based counter

Time:07-19

I was comparing the performance of a approximate counter and a simple concurrent counter from the book performance of single concurrent vs approximate

Here are the code that I wrote to measure time.

concurrent-counter.c:

#define _GNU_SOURCE 
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <sched.h>
#include <unistd.h>
#include "measure-time.h"

typedef struct __counter_t {
    int             value;
    pthread_mutex_t lock;
} counter_t;

typedef struct __worker_params {
    counter_t   *counter;
    int         count;
} worker_params;

void init(counter_t *counter) {
    counter->value = 0;
    assert(pthread_mutex_init(&counter->lock, NULL) == 0);
}

void increment(counter_t *counter, int loop){
    for (int i = 0; i < loop; i  ) {
        assert(pthread_mutex_lock(&counter->lock) == 0);
        counter->value  ;
        assert(pthread_mutex_unlock(&counter->lock) == 0);
    }
}

void *worker(void *args) {
    worker_params   *w_args = (worker_params *) args;
    increment(w_args->counter, w_args->count);
    return NULL;
}

int     main(int argc, char *argv[]) 
{
    int             max_num_of_cpu;
    int             num_of_threads;
    int             count;
    char            one_thread_per_cpu;
    pthread_t       *threads;
    pthread_attr_t  *thread_attrs;
    cpu_set_t       *cpu_sets;
    counter_t       counter;
    worker_params   w_args;

    if (argc != 4) {
        printf ("please enter three arguments : number of threads, increase count, one_thread_per_cpu\n");
        return -1;
    }

    num_of_threads= atoi(argv[1]);
    count = atoi(argv[2]);
    one_thread_per_cpu = strcmp(argv[3], "true") == 0 ? 1 : 0;
    max_num_of_cpu = sysconf(_SC_NPROCESSORS_CONF);
    if (one_thread_per_cpu == 1) assert( num_of_threads <= max_num_of_cpu);
    threads = malloc(sizeof(pthread_t)*num_of_threads);
    thread_attrs = malloc(sizeof(pthread_attr_t)*num_of_threads);
    cpu_sets = malloc(sizeof(cpu_set_t)*max_num_of_cpu);
    assert(threads != NULL && thread_attrs != NULL && cpu_sets != NULL);

    init(&counter);
    w_args.counter = &counter;
    w_args.count = count / num_of_threads;
    for (int i = 0; i < num_of_threads; i  )
    {
        CPU_ZERO(cpu_sets i);
        CPU_SET(i, cpu_sets i);
        pthread_attr_init(thread_attrs i);
        if (one_thread_per_cpu == 1) 
            pthread_attr_setaffinity_np(thread_attrs i, sizeof(cpu_set_t), cpu_sets i);
        else
            // bind thread to first cpu
            pthread_attr_setaffinity_np(thread_attrs i, sizeof(cpu_set_t), cpu_sets);
    }

    start_timer();
    for (int i = 0; i < num_of_threads; i  )
        pthread_create(threads i, thread_attrs i, worker, &w_args);
    for (int i = 0; i < num_of_threads; i  )
        pthread_join(threads[i], NULL);
    end_timer();
    printf(one_thread_per_cpu == 1 ? "one thread per cpu\n" : "running on only one cpu\n");
    printf("number of threads: %d, total increase count: %d, total time %fs, final value : %d\n", num_of_threads, count, get_elapsed_seconds(), counter.value);
    
    free(threads);
    for (int i = 0; i < num_of_threads; i  )
        pthread_attr_destroy(thread_attrs i);
    free(thread_attrs);
    free(cpu_sets);
}

sloppy-counter.c :

#define _GNU_SOURCE 
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <sched.h>
#include <unistd.h>
#include "measure-time.h"

typedef struct __counter_t {
    int             threshold;
    int             num_of_cpu;
    int             global_value;
    pthread_mutex_t global_lock;
    int             *local_values;
    pthread_mutex_t *local_locks;
} counter_t;

typedef struct __worker_params {
    counter_t   *counter;
    int         count;
    int         cpu_id;
} worker_params;

void init(counter_t *counter, int threshold) {
    counter->threshold = threshold;
    counter->num_of_cpu = sysconf(_SC_NPROCESSORS_CONF);
    counter->global_value = 0;
    counter->local_values = malloc(sizeof(int)*counter->num_of_cpu);
    counter->local_locks = malloc(sizeof(pthread_mutex_t)*counter->num_of_cpu);
    assert(counter->local_values != NULL && counter->local_locks != NULL);
    assert(pthread_mutex_init(&counter->global_lock, NULL) == 0);
    for (int i=0; i < counter->num_of_cpu; i  ){
        assert(pthread_mutex_init(counter->local_locks i, NULL) == 0);
        counter->local_values[i] = 0;
    }
}

void increment(counter_t *counter, int cpu_id) {
    assert(pthread_mutex_lock(counter->local_locks cpu_id) == 0);
    counter->local_values[cpu_id]  ;
    if (counter->local_values[cpu_id] >= counter->threshold){
        assert(pthread_mutex_lock(&counter->global_lock) == 0);
        counter->global_value  = counter->local_values[cpu_id];
        assert(pthread_mutex_unlock(&counter->global_lock) == 0);
        counter->local_values[cpu_id] = 0;
    }
    assert(pthread_mutex_unlock(counter->local_locks cpu_id) == 0);
}

int get_value(counter_t *counter) {
    int global_value;
    assert(pthread_mutex_lock(&counter->global_lock) == 0);
    global_value = counter->global_value;
    assert(pthread_mutex_unlock(&counter->global_lock) == 0);
    return global_value;
}

void *worker(void *args) {
    worker_params   *w_args = (worker_params *) args;
    for (int i = 0; i < w_args->count; i  ) increment(w_args->counter, w_args->cpu_id);
    return NULL;
}

void worker_params_init(worker_params *w_args, counter_t *counter, int count, int cpu_id){
    w_args->counter = counter;
    w_args->count = count;
    w_args->cpu_id = cpu_id;
}

int     main(int argc, char *argv[]) 
{
    int             num_of_cpus;
    int             num_of_threads;
    int             count;
    int             threshold;
    pthread_t       *threads;
    pthread_attr_t  *thread_attrs;
    cpu_set_t       *cpu_sets;
    counter_t       counter;
    worker_params   *w_args;

    if (argc != 4) {
        printf ("please enter three arguments : number of threads, increase count, threshold value\n");
        return -1;
    }

    num_of_cpus = sysconf(_SC_NPROCESSORS_CONF);
    num_of_threads= atoi(argv[1]);
    count = atoi(argv[2]);
    threshold = atoi(argv[3]);
    threads = malloc(sizeof(pthread_t)*num_of_threads);
    thread_attrs = malloc(sizeof(pthread_attr_t)*num_of_cpus);
    cpu_sets = malloc(sizeof(cpu_set_t)*num_of_cpus);
    w_args = malloc(sizeof(worker_params)*num_of_cpus);
    assert(threads != NULL && thread_attrs != NULL && cpu_sets != NULL);

    init(&counter, threshold);
    for (int i = 0; i < num_of_cpus; i  ){
        CPU_ZERO(cpu_sets i);
        CPU_SET(i, cpu_sets i);
        worker_params_init(w_args i, &counter, count/num_of_threads, i);
        pthread_attr_init(thread_attrs i);
    }
    for (int i = 0; i < num_of_threads; i  )
        pthread_attr_setaffinity_np(thread_attrs i%num_of_cpus, sizeof(cpu_set_t), cpu_sets i);

    start_timer();
    for (int i = 0; i < num_of_threads; i  )
        pthread_create(threads i, thread_attrs i%num_of_cpus, worker, w_args i%num_of_cpus);
    for (int i = 0; i < num_of_threads; i  )
        pthread_join(threads[i], NULL);
    end_timer();

    if (num_of_threads == 1) printf("\nthreshold : %d\n", threshold);
    printf("threads: %d   time: %fs    global: %d\n", num_of_threads, get_elapsed_seconds(), get_value(&counter));

    for (int i=0; i < num_of_cpus; i  )
        pthread_attr_destroy(thread_attrs i);
    free(threads);
    free(thread_attrs);
    free(counter.local_values);
    free(counter.local_locks);
    free(cpu_sets);
    free(w_args);
}

measure-time.c

#include "measure-time.h"

void        start_timer()
{
    clock_gettime(CLOCK_REALTIME, &start);
}

void        end_timer()
{
    clock_gettime(CLOCK_REALTIME, &end);
}

float       get_elapsed_seconds()
{
    return end.tv_sec   end.tv_nsec/1E9 - start.tv_sec - start.tv_nsec/1E9;
}

long long   get_elapsed_nano_seconds()
{
    return end.tv_sec*1E9   end.tv_nsec - start.tv_sec*1E9 - start.tv_nsec;
}

Any help would be greatly appreciated. Thank you.

CodePudding user response:

First of all, local_values and local_locks are allocated so that multiple items can share the same cache line. This is a problem when multiple threads access it because the cache coherence protocol causes a cache-line bouncing effect between the cores modifying the same cache line. This problem is known as false sharing. You can remove this effect by just aligning each item to the cache-line size on the target architecture (typically 64 bytes on x86-64 processors). Note the data structure takes more space because of the significant padding between items, but performance matters more here. This improves the performance of the sloppy version by about 50%-70% on my machine (with the argument 6 12000000 1048576).

Additionally, note that the sloppy version is only useful if the threshold is close to the maximum value. Otherwise, it just introduce more overheads compared to the alternative version (because the global lock is use the same way in both case and this is the bottleneck).

Furthermore, threads do not start at the same time and they are only slowed down when they are working on the same resource (ie. lock). A lock is very cheap when only one thread work on it. I can clearly see that some thread last for 0.15 seconds while some others last for 0.35 second (especially on the sloppy version). Thus, put it shortly, the benchmark is flawed.

Finally, the thread binding do not work. At least not on my machine: the OS schedule nearly all the threads on the same core with the concurrent version so it can be significantly faster (since there is no contention on the lock). This can be seen using multi-threading profiling tools. This is not much the case with the sloppy version. The later is thus more prone to the cache line bouncing effect and so worst performance.


Additional notes and discussion

A low level profile show that the bottleneck of the concurrent version is in libpthread-2.33.so:

  50,19% __pthread_mutex_unlock_usercnt
  42,39% __pthread_mutex_lock

This include kernel function calls so there is almost no context switch overhead and the OS is smart enough so to schedule the threads on the same core so that there is no false-sharing overhead in this case on my machine. There are a bit more scheduling issue with the sloppy version but in practice, the profiling result is similar. The thing is the sloppy version use twice more locks so it is about twice slower on my machine.

This is not representative of a real-world applications because in practice an application do more computations and so the scheduler need to schedule the threads on different cores so it can be more efficiently executed (at the expense of a slower counter execution).

If you want to speed this up, you can remove the global lock and just read the value of the threads. This slow down the threads but make the increment faster. Using atomic operation might be faster because the thread reading the value of other threads do not need to lock the cache line and the thread incrementing their own atomic value should not be slowed down since the cache line is not invalidated.

CodePudding user response:

It's logically not possible looks more like a momery access problem than a compute time problem

  • Related