C++  并发无锁队列的原理与实现

您所在的位置:网站首页 无锁队列出队列顺序 C++  并发无锁队列的原理与实现

C++  并发无锁队列的原理与实现

2023-07-08 14:27| 来源: 网络整理| 查看: 265

C++  并发无锁队列的原理与实现

一般无锁队列的情况分为两种,第一种是单个消费者与单个生产者,第二种是多个消费者或者多个生产着的情况。

一.单个消费者与单个生产者的情况

这种情况下可以用环形队列RingBuffer来实现无锁队列,比如dpdk和kfifo的无锁队列就是用环形队列实现的,kfifo里面的入队和出队的处理很巧妙,大家可以去看看。 DPDK:https://blog.csdn.net/s2603898260/article/details/109565922 KFIFO:https://blog.csdn.net/weixin_30426957/article/details/96164037

用数组模拟环形队列举个简单的例子:

#include #include #include #include using namespace std; #define RING_QUEUE_SIZE 10 template class RingBuffer { public: RingBuffer(int size): m_size(size),m_head(0), m_tail(0) { m_buf = new T[size]; } ~RingBuffer() { delete [] m_buf; m_buf = NULL; } inline bool isEmpty() const { return m_head == m_tail; } inline bool isFull() const { return m_tail == (m_head + 1) % m_size; //取模是为了考虑队列尾的特殊情况 } bool push(const T& value) { //以实例的方式传值 if(isFull()) { return false; } m_buf[m_head] = value; m_head = (m_head + 1) % m_size; return true; } bool push(const T* value) { //以指针的方式传值 if(isFull()) { return false; } m_buf[m_head] = *value; m_head = (m_head + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_buf[m_tail]; m_tail = (m_tail + 1) % m_size; return true; } inline unsigned int head()const { return m_head; } inline unsigned int tail()const { return m_tail; } inline unsigned int size()const { return m_size; } private: int m_size; // 队列大小 int m_head; // 队列头部索引 int m_tail; // 队列尾部索引 T* m_buf; // 队列数据缓冲区 }; typedef struct Node { //任务节点 int cmd; void *value; //可根据情况改变 }taskNode; #if 1 void produce(RingBuffer* rqueue) { int i = 0; for(i=0;ipush(node); } } void consume(RingBuffer* rqueue) { while(!rqueue->isEmpty()) { taskNode node; rqueue->pop(node); } } #endif int main() { RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE); std::thread producer(produce,rqueue); std::thread consumer(consume,rqueue); producer.join(); consumer.join(); delete rqueue; return 0; } 二.多个生产者或者多个消费者的情况

这种情况一般都是出现在多线程开发的场合,会有多个对象同时操作队列,此时为了避免这种情况,可以有两种方式,第一种就是最常见的加锁,第二种就是无锁队列,用原子操作实现,无锁数据结构依赖很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,基本的思路就是先与内存中的值进行比较,如果相同就进行set或者swap操作。

常见的gcc内置原子操作有下面这些。

type __sync_fetch_and_add (type *ptr, type value, ...) // 将value加到*ptr上,结果更新到*ptr,并返回操作之前*ptr的值type __sync_fetch_and_sub (type *ptr, type value, ...) // 从*ptr减去value,结果更新到*ptr,并返回操作之前*ptr的值type __sync_fetch_and_or (type *ptr, type value, ...) // 将*ptr与value相或,结果更新到*ptr, 并返回操作之前*ptr的值type __sync_fetch_and_and (type *ptr, type value, ...) // 将*ptr与value相与,结果更新到*ptr,并返回操作之前*ptr的值type __sync_fetch_and_xor (type *ptr, type value, ...) // 将*ptr与value异或,结果更新到*ptr,并返回操作之前*ptr的值type __sync_fetch_and_nand (type *ptr, type value, ...) // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之前*ptr的值type __sync_add_and_fetch (type *ptr, type value, ...) // 将value加到*ptr上,结果更新到*ptr,并返回操作之后新*ptr的值type __sync_sub_and_fetch (type *ptr, type value, ...) // 从*ptr减去value,结果更新到*ptr,并返回操作之后新*ptr的值type __sync_or_and_fetch (type *ptr, type value, ...) // 将*ptr与value相或, 结果更新到*ptr,并返回操作之后新*ptr的值type __sync_and_and_fetch (type *ptr, type value, ...) // 将*ptr与value相与,结果更新到*ptr,并返回操作之后新*ptr的值type __sync_xor_and_fetch (type *ptr, type value, ...) // 将*ptr与value异或,结果更新到*ptr,并返回操作之后新*ptr的值type __sync_nand_and_fetch (type *ptr, type value, ...) // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之后新*ptr的值bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回truetype __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回操作之前*ptr的值__sync_synchronize (...) // 发出完整内存栅栏type __sync_lock_test_and_set (type *ptr, type value, ...) // 将value写入*ptr,对*ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义void __sync_lock_release (type *ptr, ...) // 将0写入到*ptr,并对*ptr解锁。即,unlock spinlock语义

下面就用环形数组来实现无锁队列,举个例子,当然用环形链表也可以,但是链表要考虑到一个ABA的问题(这个后面再介绍分析)。

#include #include #include #include #include using namespace std; #define RING_QUEUE_SIZE 10 //std::mutex mtx; template class RingBuffer { public: RingBuffer(int size): m_size(size),m_head(0), m_tail(0) { m_buf = new T[size]; } ~RingBuffer() { delete [] m_buf; m_buf = NULL; } inline bool isEmpty() const { return m_head == m_tail; } inline bool isFull() const { return m_tail == (m_head + 1) % m_size; //取模是为了考虑队列尾的特殊情况 } /*使用互斥锁实现并发情况共用队列*/ # if 0 bool mut_push(const T& value); bool mut_push(const T* value); bool mut_pop(T& value); #endif /*使用原子操作实现并发情况无锁队列*/ bool cas_push(const T& value); bool cas_push(const T* value); bool cas_pop(T& value); inline unsigned int head()const { return m_head; } inline unsigned int tail()const { return m_tail; } inline unsigned int size()const { return m_size; } private: int m_size; // 队列大小 int m_head; // 队列头部索引 int m_tail; // 队列尾部索引 T* m_buf; // 队列数据缓冲区 }; # if 0 //互斥锁实现 template bool RingBuffer::mut_push(const T& value) { while (mtx.try_lock()) { if(isFull()) { return false; } m_buf[m_head] = value; m_head = (m_head + 1) % m_size; mtx.unlock(); return true; } } template bool RingBuffer::mut_push(const T* value) { while (mtx.try_lock()) { if(isFull()) { return false; } m_buf[m_head] = *value; m_head = (m_head + 1) % m_size; mtx.unlock(); return true; } } template bool RingBuffer::mut_pop(T& value) { while (mtx.try_lock()) { if(isEmpty()) { return false; } value = m_buf[m_tail]; m_tail = (m_tail + 1) % m_size; mtx.unlock(); return true; } } #endif template bool RingBuffer::cas_push(const T& value) { if(isFull()) { return false; } int oldValue,newValue; do{ oldValue = m_head; newValue = (oldValue + 1) % m_size; }while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true); m_buf[oldValue] = value; return true; } template bool RingBuffer::cas_push(const T* value) { if(isFull()) { return false; } int oldValue,newValue; do{ oldValue = m_head; newValue = (oldValue + 1) % m_size; }while(__sync_bool_compare_and_swap(&m_head,oldValue,newValue) != true); m_buf[oldValue] = *value; return true; } template bool RingBuffer::cas_pop(T& value) { if(isEmpty()) { return false; } int oldValue,newValue; do{ oldValue = m_tail; newValue = (oldValue + 1) % m_size; }while(__sync_bool_compare_and_swap(&m_tail,oldValue,newValue) != true); value = m_buf[oldValue]; return true; } typedef struct Node { //任务节点 int cmd; void *value; }taskNode; void produce(RingBuffer* rqueue) { int i = 0; for(i=0;icas_push(node); } } void consume(RingBuffer* rqueue) { while(!rqueue->isEmpty()) { taskNode node; rqueue->cas_pop(node); } } int main() { int i = 0; RingBuffer * rqueue = new RingBuffer(RING_QUEUE_SIZE); std::thread producer[20]; std::thread consumer[20]; for(i = 0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3