C++ 无锁队列

     2023-02-22     162

关键词:

【中文标题】C++ 无锁队列【英文标题】:C++ Lock-Free Queue 【发布时间】:2020-07-26 16:04:49 【问题描述】:

我设计了这个函数,用来实现Lock-Free队列,但是在实际执行过程中(dequeue)存在死锁问题。我检查了很多次,我认为这很好。 我在 x86 平台上运行,有 12 个线程可以读写。

现在我想弄清楚是什么导致了这种情况,我想知道这是否是线程安全的设计,还是需要继续优化以获得更高的性能。

12 个线程出队和 12 个线程入队。 开发工具:Visual Studio 2019 我非常期待您的回复。谢谢你。


#include <iostream>
#include <functional>

#include<atomic>
#include<cassert>
#include<thread>
#include<vector>
template<typename T>
class mpmc_queue_t

public:
    mpmc_queue_t(size_t size) :
        _size(size),
        _mask(_size - 1),
        _buffer((node_t*)(new aligned_node_t[_size]))
    
        assert((_size != 0) && ((_size & (~_size + 1)) == _size));
        _read.store(0, std::memory_order_relaxed);
        _write.store(0, std::memory_order_relaxed);
        for (size_t i = 0; i < _size; ++i)
        
            _buffer[i].status.store(false, std::memory_order_relaxed);
        
    
    ~mpmc_queue_t()
    
        delete[] _buffer;
    
    bool enqueue(const T& data)
    
        auto write = _write.fetch_add(1, std::memory_order_relaxed);
        node_t* node = &_buffer[write & _mask];
        while (true)
        
            if (!node->status.load(std::memory_order_acquire))
            
                node->data = data;
                node->status.store(true, std::memory_order_release);
                return true;
            
            std::this_thread::yield();
        
    
    bool dequeue(T& data)
    
        auto read = _read.fetch_add(1, std::memory_order_relaxed);
        node_t* node = &_buffer[read & _mask];
        while (true)
        
            if (node->status.load(std::memory_order_acquire))
            
                data = node->data;
                node->status.store(false, std::memory_order_release);
                return true;
            
            std::this_thread::yield();
        
    

private:
    struct node_t
    
        T                   data;
        std::atomic_bool    status;
    ;
    typedef typename std::aligned_storage<sizeof(node_t), std::alignment_of<node_t>::value>::type aligned_node_t;
    typedef char cache_line_pad_t[64];
    cache_line_pad_t    _pad0;
    size_t              _size;
    size_t              _mask;
    node_t* const       _buffer;
    cache_line_pad_t    _pad1;
    std::atomic_size_t  _read;
    cache_line_pad_t    _pad2;
    std::atomic_size_t  _write;
    cache_line_pad_t    _pad3;
;

#define COUNT 100000000
#define THREAD 12
typedef mpmc_queue_t<size_t> queue_t;

template<typename T>
void consumer_func(T* queue)

    size_t count = COUNT;
    size_t value = 0;

    while (count > 0) 
        if (queue->dequeue(value)) 
            --count;
        
    
    std::cout << "consumer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;


template<typename T>
void producer_func(T* queue)

    size_t count = COUNT;
    while (count > 0) 
        if (queue->enqueue(count)) 
            --count;
        
    
    std::cout << "producer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;


template<typename T>
long double
run_test(
    T producer_func,
    T consumer_func)

    typedef std::chrono::high_resolution_clock clock_t;
    typedef std::chrono::time_point<clock_t> time_t;
    time_t start;
    time_t end;
    start = clock_t::now();
    std::thread producer0(producer_func);
    std::thread producer1(producer_func);
    std::thread producer2(producer_func);
    std::thread producer3(producer_func);
    std::thread producer4(producer_func);
    std::thread producer5(producer_func);
    std::thread producer6(producer_func);
    std::thread producer7(producer_func);
    std::thread producer8(producer_func);
    std::thread producer9(producer_func);
    std::thread producer10(producer_func);
    std::thread producer11(producer_func);

    std::thread consumer0(consumer_func);
    std::thread consumer1(consumer_func);
    std::thread consumer2(consumer_func);
    std::thread consumer3(consumer_func);
    std::thread consumer4(consumer_func);
    std::thread consumer5(consumer_func);
    std::thread consumer6(consumer_func);
    std::thread consumer7(consumer_func);
    std::thread consumer8(consumer_func);
    std::thread consumer9(consumer_func);
    std::thread consumer10(consumer_func);
    std::thread consumer11(consumer_func);

    producer0.join();
    producer1.join();
    producer2.join();
    producer3.join();
    producer4.join();
    producer5.join();
    producer6.join();
    producer7.join();
    producer8.join();
    producer9.join();
    producer10.join();
    producer11.join();

    consumer0.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();
    consumer4.join();
    consumer5.join();
    consumer6.join();
    consumer7.join();
    consumer8.join();
    consumer9.join();
    consumer10.join();
    consumer11.join();
    end = clock_t::now();

    return
        (end - start).count()
        * ((double)std::chrono::high_resolution_clock::period::num
            / std::chrono::high_resolution_clock::period::den);



int main()


    
        queue_t queue(65536);
        long double seconds = run_test(std::bind(&producer_func<queue_t>, &queue),
            std::bind(&consumer_func<queue_t>, &queue));

        std::cout << "The control group completed "
            << COUNT * THREAD
            << " iterations in "
            << seconds
            << " seconds. "
            << ((long double)COUNT * THREAD / seconds) / 1000000
            << " million enqueue/dequeue pairs per second."
            << std::endl;
    

    return 0;

【问题讨论】:

除非您的缓冲区非常大,否则我看不出有任何方法可以确保在您等待时读取或写入指针不会一直环绕。其他线程可以在一次调用yield 期间执行很多 入队和出队 我会先修复一些通用的 C++ 问题,例如可复制性和可分配性以及带有警告的编译。此外,该课程以使用reinterpret_cast 的谎言开始其生命。想想这在这里做了什么以及这两种指针类型实际上有何不同!然后,提取minimal reproducible example 以在此处发布。此外,使用 cmets 描述您的策略会有所帮助,既有助于审查,也有助于理解和修正。 @UlrichEckhardt 谢谢你,我改了代码,现在可以直接运行了。 @MattTimmermans 谢谢你的回复,如果缓冲区满了就不能继续写了,因为每个节点都有一个节点状态,真:可读,假:可写。 【参考方案1】:

这种设计不是无锁的,而是“无锁”的,因为出队中的线程可能必须等待对该项目的入队操作完成(通过status 发出信号),即它不提供无锁要求的进度保证。

正如 Matt Timmermans 已经指出的那样,索引环绕时会出现问题。不保证节点的status已经更新,或者由于status上的操作不是顺序一致的,所以这个更新是否可见。这可能会导致数据竞争,因为在回绕之后,两个线程(在不同的轮次中)尝试推送到同一个节点,因为两个线程都观察到 node-&gt;status.load() 返回 false。

要解决这个问题,您可以在节点中使用计数器而不是布尔值来跟踪节点所属的当前轮次(类似于 Dmitry Vukov 在此队列中的做法:http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue)

【讨论】:

c++千万数据级别正确使用无锁队列,避免内存撕碎(代码片段)

1、内存2、时间计算类这个时间计算在我以前的文章里面都有classTicTocpublic: TicToc() tic(); voidtic() start=std::chrono::system_clock::now(); doubletoc() end=std::chrono::system_clock::now(); std::chrono::duratio 查看详情

c++并发编程----无锁实现线程安全队列(《c++concurrencyinaction》读书笔记)(代码片段)

采用计数器的方式防止条件竞争。template<typenameT>classlock_free_queueprivate:structnode;structcounted_node_ptrintexternal_count;//记录外部线程通过指针的引用node*ptr;;std::atomic<counted_node_ptr>head;std::atomic<c 查看详情

c++千万数据级别正确使用无锁队列,避免内存撕碎(代码片段)

第一篇文章这里逐渐深入,先讲一下协议,很多概念必须讲清楚七层协议我们使用的是网络传输,千万级别的数据实际上并不多,对于队列来说,每个队列的数据块大小定义为MTU大小,如1400字节,实际... 查看详情

c++编程经验(10):无锁编程其实没那么玄乎(代码片段)

文章目录atomic演示曾经有个人,问我对无锁队列的实现是怎么想的。我想了一会儿,还是纳闷儿,无锁,也能做消息队列吗?然后他让我回去好好查查。没错,他就是面试官。atomic在有些场景里面,是... 查看详情

c++编程经验(10):无锁编程其实没那么玄乎(代码片段)

文章目录atomic演示曾经有个人,问我对无锁队列的实现是怎么想的。我想了一会儿,还是纳闷儿,无锁,也能做消息队列吗?然后他让我回去好好查查。没错,他就是面试官。atomic在有些场景里面,是... 查看详情

多线程编程之无锁队列

关于无锁队列的概念与实现,可以参考博文《无锁队列的实现》,主要涉及到的知识点包括CAS原子操作、无锁队列的链表实现、无锁队列的数组实现以及ABA问题。  下面借鉴了《多线程的那点儿事(之无锁队列)》的代码,说... 查看详情

go语言无锁队列组件的实现(chan/interface/select)(代码片段)

...数量在合理范围内,对应大批量任务可以使用“协程池+无锁队列”实现。2.golang无锁队列实现思路Channel是Go中的一个核心类型,你可以把它看成一个管道,通过它并发核心单元就可以发送或者接收数据进行通讯(communication)。无锁... 查看详情

无锁队列的实现

...是在某些特殊的场景下,是可以通过优化数据结构来达到无锁的目的。那么我们就来看一下如何实现一个无锁队列。队列:众所周知,就是先进先出。出队列的时候从队列头取出一个结点;入队列的时候,将结点添加到队列尾部... 查看详情

高效无锁环形队列

在linux零拷贝技术中提到高效无锁环形队列,这篇文章分享这个知识点,这个知识点不是很复杂。环形队列多用于多线程之间数据异步传输,提高数据处理性能。下面介绍常规环形队列和高效无锁队列两种处理方式。... 查看详情

cas无锁队列的实现(代码片段)

文章目录1.基本原理2.代码实现2.1使用链表实现无锁队列2.2使用数组实现环形无锁队列3.ABA问题及解决4.参考资料1.基本原理源于1994年10月发表在国际并行与分布式会议上的论文【无锁队列的实现.pdf】。CAS(CompareAndSwap,CAS... 查看详情

没有原子的 SPSC 无锁队列

】没有原子的SPSC无锁队列【英文标题】:SPSClockfreequeuewithoutatomics【发布时间】:2014-11-2600:20:48【问题描述】:我的记录器有一个SPSC队列。它肯定不是一个通用的SPSC无锁队列。但是,考虑到关于如何使用、目标架构等的一系列... 查看详情

无锁队列

1伪命题这本身是个伪命题。多线程之间使用队列是一定需要做到同步的。也就是说一定是需要同步手段的,一定要在一个线程读写的时候,阻塞另一个线程。既然不然用锁,那就是用原子变量吧。 2CAS3实现队列,这里使用链... 查看详情

并发无锁队列

1、前言    队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。结合自己在工... 查看详情

原子操作实现无锁队列

关于CAS等原子操作在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare&Set或是Compare&Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子... 查看详情

如何在无锁并发队列中实现“Front”方法?

】如何在无锁并发队列中实现“Front”方法?【英文标题】:Howtoimplement"Front"methodinalock-freeconcurrentqueue?【发布时间】:2019-11-2523:54:18【问题描述】:我正在尝试实现一个并发无锁队列。我正在密切关注这篇论文:https://www... 查看详情

无锁队列的实现(陈皓)(代码片段)

在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare&Set,或是Compare&Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是CMPXCHG汇编指令。有了这个原子操作,我们就... 查看详情

c++11 2线程无锁队列

】c++112线程无锁队列【英文标题】:c++11lock-freequeuewith2thread【发布时间】:2020-07-3020:33:09【问题描述】:除了主线程,我还有一个线程接收数据并将它们写入文件。std::queue<std::vector<int>>dataQueue;std::mutexmutex;voidsetData(consts... 查看详情

使用无锁指针队列在线程之间移动数据是不是安全

】使用无锁指针队列在线程之间移动数据是不是安全【英文标题】:Isitsafetomovedatabetweenthreadsusingalocklessqueueofpointers使用无锁指针队列在线程之间移动数据是否安全【发布时间】:2017-11-0312:34:26【问题描述】:我已经实现了一个... 查看详情