C++并发编程

您所在的位置:网站首页 cpp线程安全问题 C++并发编程

C++并发编程

2024-07-03 23:50| 来源: 网络整理| 查看: 265

采用计数器的方式防止条件竞争。

template class lock_free_queue { private: struct node; struct counted_node_ptr { int external_count;//记录外部线程通过指针的引用 node* ptr; }; std::atomic head; std::atomic tail; // 1 //计数器 struct node_counter { unsigned internal_count : 30; unsigned external_counters : 2; // 大小不会超过2,同时最多由next,和head或者tail指向。 }; struct node { std::atomic data; std::atomic count; // 3 counted_node_ptr next; node() { node_counter new_count; new_count.internal_count = 0; new_count.external_counters = 2; // next 和 tail 同时指向,因为新创建的节点将会放入到队列中 count.store(new_count); next.ptr = nullptr; next.external_count = 0; } /* 释放对当前节点的引用,释放节点引用时语言将该节点得到内部计数减一, 如果减一后节点的外部引用以及内部引用计数为零,则删除该节点。 释放操作由节点自己完成,所以该函数为node的成员函数。 所以该函数释放的是该函数的计数器的外部引用节点。 */ void release_ref() { node_counter old_counter = count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter = old_counter; --new_counter.internal_count; // 内部计数减一 } while (!count.compare_exchange_strong( // 2 old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); if (!new_counter.internal_count && !new_counter.external_counters) { delete this; // 3 } }//release_ref };//node /* 增加对对节点的外部引用计数, 同时增加两个计数指针的引用, 增加的是外部节点的引用,而非节点的引用计数器。 */ static void increase_external_count( std::atomic& counter, counted_node_ptr& old_counter) { counted_node_ptr new_counter; do { new_counter = old_counter; ++new_counter.external_count; } while (!counter.compare_exchange_strong( old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); old_counter.external_count = new_counter.external_count; }//increase_external_count /* 释放一个节点的引用,同时将外部节点的引用计数加入到内部节点中, 当节点的引用计数为零时将节点删除。计数器中的外部引用也会减一 */ static void free_external_counter(counted_node_ptr & old_node_ptr) { node* const ptr = old_node_ptr.ptr; int const count_increase = old_node_ptr.external_count - 2; node_counter old_counter = ptr->count.load(std::memory_order_relaxed); node_counter new_counter; do { new_counter = old_counter; --new_counter.external_counters; // 1 计数器外部节点引用减一,此时tail和next已经没有指向该节点 new_counter.internal_count += count_increase; // 2 将外部节点引用的计数加到内部节点引用 } while (!ptr->count.compare_exchange_strong( // 3 old_counter, new_counter, std::memory_order_acquire, std::memory_order_relaxed)); if (!new_counter.internal_count && !new_counter.external_counters) { delete ptr; // 如果引用为零,便删除节点 } }//free_external_counter public: /* 数据进对队列成功后会将计数器的外部计数减一, 因为此时只有一个next指向节点 */ void push(T new_value) { std::unique_ptr new_data(new T(new_value)); counted_node_ptr new_next; new_next.ptr = new node; new_next.external_count = 1; counted_node_ptr old_tail = tail.load(); for (;;) { increase_external_count(tail, old_tail); // 增加外部节点的引用(非计数器) T* old_data = nullptr; if (old_tail.ptr->data.compare_exchange_strong( // 6 old_data, new_data.get())) { old_tail.ptr->next = new_next; old_tail = tail.exchange(new_next); free_external_counter(old_tail); // 此时内部计数的外部计数将减一 new_data.release(); break; } old_tail.ptr->release_ref();//指针离开节点则内部计数减一 } }//push std::unique_ptr pop() { counted_node_ptr old_head = head.load(std::memory_order_relaxed); // 1 for (;;) { increase_external_count(head, old_head); // 增加节点引用(非计数器引用) node* const ptr = old_head.ptr; if (ptr == tail.load().ptr)//条件满足则队列为空 { ptr->release_ref(); // 3 return std::unique_ptr(); } if (head.compare_exchange_strong(old_head, ptr->next)) // 条件满足则取出数据 { T* const res = ptr->data.exchange(nullptr);//取出数据 free_external_counter(old_head); //释放计数器节点 return std::unique_ptr(res); } ptr->release_ref(); // 指针离开节点,则内部数据减一 } }//pop };

在push操作中,代码 6 ,if (old_tail.ptr->data.compare_exchange_strong( old_data, new_data.get()))条件失败就会导致一直循环,从而导致cpu一直被占用,因此,这种情况实际上类似于自旋锁,这将使得效率非常的低,因此,我们可以想办法让忙等待的线程帮助其他线程完成操作,因而减少忙等带来的浪费。

template class lock_free_queue { private: struct node { std::atomic data; std::atomic count; std::atomic next; // 1 }; public: std::unique_ptr pop() { counted_node_ptr old_head=head.load(std::memory_order_relaxed); for(;;) { increase_external_count(head,old_head); node* const ptr=old_head.ptr; if(ptr==tail.load().ptr) { return std::unique_ptr(); } counted_node_ptr next=ptr->next.load(); // 2 if(head.compare_exchange_strong(old_head,next)) { T* const res=ptr->data.exchange(nullptr); free_external_counter(old_head); return std::unique_ptr(res); } ptr->release_ref(); } } /* 更新尾节点 */ void set_new_tail(counted_node_ptr& old_tail, // 1 counted_node_ptr const& new_tail) { node* const current_tail_ptr = old_tail.ptr; while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr);//如果没有线程修改尾节点或者pop尾节点,那么修改尾节点 if (old_tail.ptr == current_tail_ptr) // 3 free_external_counter(old_tail); // 4 else current_tail_ptr->release_ref(); // 5 } public: //提高性能的push void push(T new_value) { std::unique_ptr new_data(new T(new_value)); counted_node_ptr new_next; new_next.ptr = new node; new_next.external_count = 1; counted_node_ptr old_tail = tail.load(); for (;;) { increase_external_count(tail, old_tail); T* old_data = nullptr; if (old_tail.ptr->data.compare_exchange_strong( // 6 将数据放入到原来的节点中 old_data, new_data.get())) { counted_node_ptr old_next = { 0 };//old_next 应该为空 if (!old_tail.ptr->next.compare_exchange_strong( // 7 old_next, new_next))//如果原来的next不空,则说明已经有其他线程已经修改尾指针 { delete new_next.ptr; // 8 此时数据已经push成功,因此释放new出的节点 new_next = old_next; // 9 new_next也应该置空 } set_new_tail(old_tail, new_next);//修改tail指针 new_data.release(); break; } else // 10 说明其他线程已经修改数据 { counted_node_ptr old_next = { 0 }; if (old_tail.ptr->next.compare_exchange_strong( // 如果还未修改尾指针,则由该线程修改 old_next, new_next))//此处是复制操作,非指针操作,但是ptr指针为浅拷贝 { old_next = new_next; // 12 old_next 指向的node和old_tail指向的是同一个 new_next.ptr = new node; // 13 因为该线程new的节点已经称为了tail,因此,需要重新new } set_new_tail(old_tail, old_next); // 更新节点指针,tail指向下一个节点 } } } };


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3