I use posix_memalign() to alloc place to put my pointers, but when i try to free this place, it comes to this:
*** Error in `/home/liqiaochu/lockless_rb/test_lockless_rb': free(): invalid next size (fast): 0x000000000060b000 ***
my code is at blow: when call rb.destroy(), it will abort
lockless_rb.h
#pragma once
#include <stdint.h>
#include <atomic>
enum rb_sync_type
{
RB_SYNC_ST, // single thread
RB_SYNC_MT, // multi thread
};
struct rb_headtail
{
volatile std::atomic<uint64_t> head{0};
volatile std::atomic<uint64_t> tail{0};
};
class lockless_rb
{
public:
// ret:success:0 failed:-1
int create(int count);
void destroy();
int enqueue_one_elem(void* elem, int rb_sync_type);
int dequeue_one_elem(void** elem, int rb_sync_type);
private:
// one blk a time
int __move_prod_head(uint64_t* old_head, int rb_sync_type);
void __enqueue_one_elem_data(uint64_t old_head, void* elem);
void __update_prod_tail(uint64_t old_head, int rb_sync_type);
int __move_cons_head(uint64_t* old_head, int rb_sync_type);
void __dequeue_one_elem_data(uint64_t old_head, void** elem);
void __update_cons_tail(uint64_t old_head, int rb_sync_type);
void* ptr_mem_ = nullptr;
rb_headtail prod_;
rb_headtail cons_;
uint64_t size_ = 0;
uint64_t mask_ = 0;
};
lockless_rb.cpp
#include "include/lock_less_rb.h"
#include <stdio.h>
#include <stdlib.h>
#include <emmintrin.h>
int lockless_rb::create(int count)
{
int ret = 0;
size_t page_sz = 4096;
size_t page_cnt = (count * sizeof(void*)) / page_sz 1;
ret = posix_memalign(&ptr_mem_, page_sz, page_cnt);
printf("%d\n", ret);
// ptr_mem_ = malloc(sizeof(void*) * count);
// if (nullptr == ptr_mem_)
// return -1;
size_ = count;
mask_ = count - 1;
return 0;
}
void lockless_rb::destroy()
{
free(ptr_mem_);
}
int lockless_rb::enqueue_one_elem(void* elem, int rb_sync_type)
{
uint64_t prod_head;
int n = __move_prod_head(&prod_head, rb_sync_type);
if (0 == n)
return -1;
__enqueue_one_elem_data(prod_head, elem);
__update_prod_tail(prod_head, rb_sync_type);
return 0;
}
int lockless_rb::dequeue_one_elem(void** elem, int rb_sync_type)
{
uint64_t cons_head;
int n = __move_cons_head(&cons_head, rb_sync_type);
if (0 == n)
return -1;
__dequeue_one_elem_data(cons_head, elem);
__update_cons_tail(cons_head, rb_sync_type);
return 0;
}
// private:
int lockless_rb::__move_cons_head(uint64_t* old_head, int rb_sync_type)
{
bool success = false;
int free_entries = 0;
while (success == false) {
*old_head = cons_.head;
asm volatile ("" : : : "memory");
free_entries = (prod_.tail - *old_head);
if (free_entries < 1)
return 0; // success read nums
if (rb_sync_type == 0) {
cons_.head = *old_head 1;
asm volatile ("" : : : "memory");
success = true;
} else {
success = cons_.head.compare_exchange_weak(*old_head, *old_head 1);
}
}
return 1;
}
void lockless_rb::__dequeue_one_elem_data(uint64_t old_head, void** elem)
{
uint64_t i;
uint64_t idx = old_head & mask_;
uint64_t* ptr_mem64 = (uint64_t*)ptr_mem_;
uint64_t* elem_64 = (uint64_t*)(elem);
*elem = reinterpret_cast<void*>(ptr_mem64[idx]);
}
void lockless_rb::__update_cons_tail(uint64_t old_head, int rb_sync_type)
{
asm volatile ("" : : : "memory");
if (0 != rb_sync_type) {
while (cons_.tail != old_head) { // faster than while(1)
_mm_pause();
}
}
cons_.tail = old_head 1;
}
int lockless_rb::__move_prod_head(uint64_t* old_head, int rb_sync_type)
{
bool success = false;
int free_entries = 0;
while (success == false) {
*old_head = prod_.head;
asm volatile ("" : : : "memory");
free_entries = (size_ cons_.tail - *old_head);
if (free_entries < 1)
return 0;
if (rb_sync_type == 0) {
prod_.head = *old_head 1;
asm volatile ("" : : : "memory");
success = true;
} else {
success = prod_.head.compare_exchange_weak(*old_head, *old_head 1);
}
}
return 1;
}
void lockless_rb::__enqueue_one_elem_data(uint64_t old_head, void* elem)
{
uint64_t i;
uint64_t idx = old_head & mask_;
uint64_t* ptr_mem64 = (uint64_t*)ptr_mem_;
ptr_mem64[idx] = reinterpret_cast<uint64_t>(elem);
}
void lockless_rb::__update_prod_tail(uint64_t old_head, int rb_sync_type)
{
asm volatile ("" : : : "memory");
if (0 != rb_sync_type) {
while (prod_.tail != old_head) {
_mm_pause();
}
}
prod_.tail = old_head 1;
}
test code:
#include "include/lock_less_rb.h"
#include <thread>
#include <vector>
#include <iostream>
int main()
{
lockless_rb rb;
rb.create(4); // rb size = 4
int cons_num = 2;
int enqueue_num = 0;
int loop_times = 100;
std::atomic_int dequeue_num(0);
char msg[20] = "hello_world";
std::vector<std::thread> thrs;
for (auto i = 0; i < cons_num; i ) {
thrs.emplace_back([&]{
while (1) {
if (loop_times == dequeue_num) {
printf("dequeue %d\n", dequeue_num.load());
break;
}
void* msg1;
if (rb.dequeue_one_elem(&msg1, rb_sync_type::RB_SYNC_MT) != 0)
continue;
else {
dequeue_num ;
}
}
});
}
thrs.emplace_back([&]{
while (1) {
if (loop_times == enqueue_num) {
printf("enqueue %d\n", enqueue_num);
break;
}
if (rb.enqueue_one_elem(msg, rb_sync_type::RB_SYNC_ST) != 0)
continue;
else {
// printf("%s, %d\n", (char*)(msg), enqueue_num);
enqueue_num ;
}
}
});
for (auto& t:thrs) {
t.join();
}
rb.destroy();
return 0;
}
thankyou!!!
CodePudding user response:
Within this call:
rb.create(4);
the parameter count
of create
member function has value 4. According to
size_t page_cnt = (count * sizeof(void*)) / page_sz 1;
page_cnt
has value 1. In
ret = posix_memalign(&ptr_mem_, page_sz, page_cnt);
you ask posix_memalign
to allocate page_cnt
bytes, therefore, just 1 byte, according to the documentation:
int posix_memalign(void **memptr, size_t alignment, size_t size);
The function
posix_memalign()
allocatessize
bytes...
Don't you want to allocate page_cnt
pages instead? If so, change the call to:
ret = posix_memalign(&ptr_mem_, page_sz, page_cnt * page_sz);