关键词:
【中文标题】MPSC 队列:竞争条件【英文标题】:MPSC Queue: Race Condition 【发布时间】:2019-07-04 11:10:27 【问题描述】:我正在尝试基于this one written in C by Dmitry Vyukov 实现一个无锁多生产者单消费者队列。
到目前为止,我编写的单个测试几乎可以正常工作。但是消费者通常会错过一个项目,要么是第一个,要么是第二个。有时,消费者会错过大约一半的输入。
就像现在一样,它不是无锁的。每次使用 new
运算符时它都会锁定,但我希望在使用分配器之前让它工作并编写一些更详尽的测试。
// src/MpscQueue.hpp
#pragma once
#include <memory>
#include <atomic>
#include <optional>
/**
* Adapted from http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
* @tparam T
*/
template< typename T >
class MpscQueue
public:
MpscQueue()
stub.next.store( nullptr );
head.store( &stub );
tail = &stub;
void push( const T& t )
emplace( t );
void push( T&& t )
emplace( std::move( t ));
template< typename ... Args >
void emplace( Args...args )
auto node = new Node std::make_unique<T>( std::forward<Args>( args )... ), nullptr ;
push( node );
/**
* Returns an item from the queue and returns a unique pointer to it.
*
* If the queue is empty returns a unique pointer set to nullptr
*
* @return A unique ptr to the popped item
*/
std::unique_ptr<T> pop()
Node* tailCopy = tail;
Node* next = tailCopy->next.load();
auto finalize = [ & ]()
tail = next;
std::unique_ptr<Node> p( tailCopy ); // free the node memory after we return
return std::move( tail->value );
;
if ( tailCopy == &stub )
if ( next == nullptr ) return nullptr;
tail = next;
tailCopy = next;
next = next->next;
if ( next ) return std::move( finalize());
if ( tail != head.load()) return nullptr;
push( &stub );
next = tailCopy->next;
return next ? std::move( finalize()) : nullptr;
private:
struct Node
std::unique_ptr<T> value;
std::atomic<Node*> next;
;
void push( Node* node )
Node* prev = head.exchange( node );
prev->next = node;
Node stub;
std::atomic<Node*> head;
Node* tail;
;
// test/main.cpp
#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedMacroInspection"
#define BOOST_TEST_MODULE test_module
#pragma clang diagnostic pop
#include <boost/test/unit_test.hpp>
// test/utils.hpp
#pragma once
#include <vector>
template< class T >
void removeFromBothIfIdentical( std::vector<T>& a, std::vector<T>& b )
size_t i = 0;
size_t j = 0;
while ( i < a.size() && j < b.size())
if ( a[ i ] == b[ j ] )
a.erase( a.begin() + i );
b.erase( b.begin() + j );
else if ( a[ i ] < b[ j ] ) ++i;
else if ( a[ i ] > b[ j ] ) ++j;
namespace std
template< typename T >
std::ostream& operator<<( std::ostream& ostream, const std::vector<T>& container )
if ( container.empty())
return ostream << "[]";
ostream << "[";
std::string_view separator;
for ( const auto& item: container )
ostream << item << separator;
separator = ", ";
return ostream << "]";
template< class T >
std::vector<T> extractDuplicates( std::vector<T>& container )
auto iter = std::unique( container.begin(), container.end());
std::vector<T> duplicates;
std::move( iter, container.end(), back_inserter( duplicates ));
return duplicates;
#define CHECK_EMPTY( container, message ) \
BOOST_CHECK_MESSAGE( (container).empty(), (message) << ": " << (container) )
// test/MpscQueue.cpp
#pragma ide diagnostic ignored "cert-err58-cpp"
#include <thread>
#include <numeric>
#include <boost/test/unit_test.hpp>
#include "../src/MpscQueue.hpp"
#include "utils.hpp"
using std::thread;
using std::vector;
using std::back_inserter;
BOOST_AUTO_TEST_SUITE( MpscQueueTestSuite )
BOOST_AUTO_TEST_CASE( two_producers )
constexpr int until = 1000;
MpscQueue<int> queue;
thread producerEven( [ & ]()
for ( int i = 0; i < until; i += 2 )
queue.push( i );
);
thread producerOdd( [ & ]()
for ( int i = 1; i < until; i += 2 )
queue.push( i );
);
vector<int> actual;
thread consumer( [ & ]()
using namespace std::chrono_literals;
std::this_thread::sleep_for( 2ms );
while ( auto n = queue.pop())
actual.push_back( *n );
);
producerEven.join();
producerOdd.join();
consumer.join();
vector<int> expected( until );
std::iota( expected.begin(), expected.end(), 0 );
std::sort( actual.begin(), actual.end());
vector<int> duplicates = extractDuplicates( actual );
removeFromBothIfIdentical( expected, actual );
CHECK_EMPTY( duplicates, "Duplicate items" );
CHECK_EMPTY( expected, "Missing items" );
CHECK_EMPTY( actual, "Extra items" );
BOOST_AUTO_TEST_SUITE_END()
【问题讨论】:
您在哪个处理器架构上运行测试? @SegFault,i64。特别是 Intel(R) Core(TM) i5-7500 CPU @ 3.40GHz 你的机器测试通过了吗? 【参考方案1】:下面我的多生产者、单消费者示例是用 Ada 编写的。我将此作为虚拟“伪代码”的来源供您考虑。该示例包含三个文件。
该示例实现了一个简单的数据记录器,其中包含多个生产者、一个共享缓冲区和一个记录生产者生成的字符串的消费者。
第一个文件是共享缓冲区的包规范。 Ada 包规范定义了包中定义的实体的 API。在这种情况下,实体是一个受保护的缓冲区和一个停止记录器的过程。
-----------------------------------------------------------------------
-- Asynchronous Data Logger
-----------------------------------------------------------------------
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;
package Async_Logger is
type Queue_Index is mod 256;
type Queue_T is array (Queue_Index) of Unbounded_String;
protected Buffer is
entry Put (Log_Entry : in String);
entry Get (Stamped_Entry : out Unbounded_String);
private
Queue : Queue_T;
P_Index : Queue_Index := 0;
G_Index : Queue_Index := 0;
Count : Natural := 0;
end Buffer;
procedure Stop_Logging;
end Async_Logger;
受保护缓冲区中的条目允许任务(即线程)写入缓冲区并从缓冲区读取。这些条目会自动执行所有必要的缓冲区锁定控制。
缓冲区代码和 Stop_Logging 过程的实现在包体中实现。记录日志的消费者也在任务主体中实现,使消费者对生产线程不可见。
with Ada.Calendar; use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;
with Ada.Text_IO; use Ada.Text_IO;
package body Async_Logger is
------------
-- Buffer --
------------
protected body Buffer is
---------
-- Put --
---------
entry Put (Log_Entry : in String) when Count < Queue_Index'Modulus is
T_Stamp : Time := Clock;
Value : Unbounded_String :=
To_Unbounded_String
(Image (Date => T_Stamp, Include_Time_Fraction => True) & " : " &
Log_Entry);
begin
Queue (P_Index) := Value;
P_Index := P_Index + 1;
Count := Count + 1;
end Put;
---------
-- Get --
---------
entry Get (Stamped_Entry : out Unbounded_String) when Count > 0 is
begin
Stamped_Entry := Queue (G_Index);
G_Index := G_Index + 1;
Count := Count - 1;
end Get;
end Buffer;
task Logger is
entry Stop;
end Logger;
task body Logger is
Phrase : Unbounded_String;
begin
loop
select
accept Stop;
exit;
else
select
Buffer.Get (Phrase);
Put_Line (To_String (Phrase));
or
delay 0.01;
end select;
end select;
end loop;
end Logger;
procedure Stop_Logging is
begin
Logger.Stop;
end Stop_Logging;
end Async_Logger;
Put 条目有一个保护条件,允许该条目仅在缓冲区未满时执行。 Get 条目有一个保护条件,允许该条目仅在缓冲区为空时执行。
名为 Logger 的任务是消费者任务。它一直运行到它的 Stop 条目被调用为止。
Stop_Logging 过程调用 Logger 的 Stop 条目。
第三个文件是用于测试 Async_Logger 包的“主”过程。该文件创建了两个生产者,P1 和 P2。这些生产者每人向 Buffer 写入 10 条消息,然后退出。
with Async_Logger; use Async_Logger;
procedure Async_Test is
task P1;
task P2;
task body P1 is
begin
for I in 1..10 loop
Buffer.Put(I'Image);
delay 0.01;
end loop;
end P1;
task body P2 is
Num : Float := 0.0;
begin
for I in 1..10 loop
Buffer.Put(Num'Image);
Num := Num + 1.0;
delay 0.01;
end loop;
end P2;
begin
delay 0.2;
Stop_Logging;
end Async_Test;
Async_Test 过程只需等待 0.2 秒,然后调用 Stop_Logging。
运行这个程序的输出是:
2019-02-11 18:35:01.83 : 1
2019-02-11 18:35:01.83 : 0.00000E+00
2019-02-11 18:35:01.85 : 1.00000E+00
2019-02-11 18:35:01.85 : 2
2019-02-11 18:35:01.87 : 3
2019-02-11 18:35:01.87 : 2.00000E+00
2019-02-11 18:35:01.88 : 3.00000E+00
2019-02-11 18:35:01.88 : 4
2019-02-11 18:35:01.90 : 5
2019-02-11 18:35:01.90 : 4.00000E+00
2019-02-11 18:35:01.92 : 6
2019-02-11 18:35:01.92 : 5.00000E+00
2019-02-11 18:35:01.93 : 6.00000E+00
2019-02-11 18:35:01.93 : 7
2019-02-11 18:35:01.95 : 7.00000E+00
2019-02-11 18:35:01.95 : 8
2019-02-11 18:35:01.96 : 8.00000E+00
2019-02-11 18:35:01.96 : 9
2019-02-11 18:35:01.98 : 10
2019-02-11 18:35:01.98 : 9.00000E+00
【讨论】:
我以前从未使用过 ada,我不太了解它如何同步所有内容的细节。队列是线程安全的结构吗?是阻塞吗?另外,我的目标是永远不要阻塞,除非它正在获取内存以创建新节点。 Ada 保护类型和受保护对象是线程安全的。每个条目都会自动处理锁定和解锁。每个条目上的保护条件也会导致调用任务暂停,直到保护条件评估为 True。在多生产者单消费者模式中需要阻塞,因为只有当没有其他生产者写入共享缓冲区时,每个生产者才必须写入共享缓冲区,并且当生产者写入缓冲区时,消费者不能从共享缓冲区中读取。此类行为会造成竞争条件和损坏的缓冲区状态。 受保护条目中的代码可以与 C 或 C++ 中的临界区进行比较。所有这些操作都必须以防止竞争条件的方式执行。在我上面的示例中,缓冲区包含一个队列以及 P_Index、C_Index 和 Count。这些都是状态变量,必须与缓冲区上的每个操作保持一致。如果两个生产者修改同一个队列元素,结果是数据损坏或丢失。同样,两个生产者试图同时更新 P_Index 和 Count 会产生不可预知的结果。【参考方案2】:您的推送功能缺少该行:
node->next = nullptr;
在顶部。
查看我的实现以及 cmets 中的大量分析, 这里:https://github.com/CarloWood/ai-utils/blob/master/threading/MpscQueue.h
【讨论】:
链接已损坏。 谢谢!我最近将线程相关的实用程序移到了另一个 git 子模块;我现在在帖子中修复了链接。std::C++ 在非空队列中异步读写会导致竞争条件吗?
】std::C++在非空队列中异步读写会导致竞争条件吗?【英文标题】:std::C++Canreadingandwritingasynchronouslyinnonemptyqueueleadtoracecondition?【发布时间】:2020-11-2501:31:17【问题描述】:我将std::queue用于发布-订阅类型系统。主线程将一些数据... 查看详情
在事件驱动的嵌入式系统中使用事件队列避免竞争条件
】在事件驱动的嵌入式系统中使用事件队列避免竞争条件【英文标题】:AvoidingRaceConditionwitheventqueueineventdrivenembeddedsystem【发布时间】:2021-03-2420:08:38【问题描述】:我正在尝试对stm32进行编程并使用事件驱动架构。例如,当发... 查看详情
秒懂:jctool的mpsc高性能无锁队列(史上最全+10w字长文)(代码片段)
文章很长,而且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+... 查看详情
条件竞争漏洞
...D开启抓包,点击购买选中数据包==shift+r==出现多条数据包队列,前几条会是并发点击GO,发送成功,正常大辣条数目应该为4,现在买了11个二、使用burpsuit点击购买,bp抓包设置为空payload,设置发包次数,(线程一定要设大,默... 查看详情
CreateTimerQueueTimer 回调和竞争条件
...1-2423:18:21【问题描述】:我在我的应用程序中使用计时器队列,并将指向我自己的C++Timer对象之一的指针作为“参数”传递给回调(在CreateTimerQueueTimer中)。然后我在回调中调用对象的虚方法。Timer对象的析构函数将确 查看详情
juc并发类概览
JUC并发类及并发相关类概览,持续补充...AQS内部有两个队列,一个等待队列(前后节点),一个条件队列(后继节点),其实是通过链表方式实现;等待队列是双向链表;条件队列是单向链表;条件队列如果被唤醒,将后接到等... 查看详情
blockingqueued
一、BlockingQueuedBlockingQueued队列使用ReentrantLock和Condition(AQS实现)来实现的。Condition只能用于独占模式。条件队列中的节点永远不会被唤醒,一直阻塞者;要想唤醒,则需要把该节点放到CLH队列中,放入到CLH队列中才有机会去竞... 查看详情
Combine 的 receive(on:) 没有分派到串行队列,导致数据竞争
】Combine的receive(on:)没有分派到串行队列,导致数据竞争【英文标题】:Combine\'sreceive(on:)notdispatchingtoserialqueue,causingdatarace【发布时间】:2021-07-1116:22:03【问题描述】:根据Apple的说法,receive(on:options:)在给定队列上运行回调。我... 查看详情
条件竞争和恶性条件竞争
竞争条件指多个线程或者进程在读写一个共享数据时结果依赖于它们执行的相对时间的情形。 竞争条件发生在当多个进程或者线程在读写数据时,其最终的的结果依赖于多个进程的指令执行顺序。例如:考虑下面的例子假设... 查看详情
aqs基本原理(代码片段)
...、资源数等。AQS内部的数据结构与原理AQS内部实现了两个队列,一个同步队列,一个条件队列。同步队列的作用是:当线程获取资源失败之后,就进入同步队列的尾部保持自旋等待,不断判断自己是否是链表的头节点,如果是头... 查看详情
Django会话竞争条件?
】Django会话竞争条件?【英文标题】:Djangosessionracecondition?【发布时间】:2012-11-2418:00:54【问题描述】:总结:Django会话中是否存在竞争条件,我该如何预防?我对Django会话有一个有趣的问题,我认为这涉及由于同一用户的同时... 查看详情
如何修复条件变量等待/通知的竞争条件
】如何修复条件变量等待/通知的竞争条件【英文标题】:Howtofixraceconditionforconditionvariablewait/notify【发布时间】:2017-07-2720:54:07【问题描述】:这个问题的答案是错误的,因为它有可能陷入僵局。ConditionVariable-Wait/NotifyRaceCondition... 查看详情
竞争条件和解锁写入
】竞争条件和解锁写入【英文标题】:Raceconditionandunlockedwrite【发布时间】:2012-04-1709:40:41【问题描述】:我有一个关于竞争条件和同时写入的问题。我有一个从不同线程访问对象的类。我想仅按需计算一些值并缓存结果。出于... 查看详情
如何使用 mpsc 通道在线程之间创建环形通信?
】如何使用mpsc通道在线程之间创建环形通信?【英文标题】:Howtocreatearingcommunicationbetweenthreadsusingmpscchannels?【发布时间】:2020-09-2616:47:22【问题描述】:我想生成n个线程,这些线程能够与环形拓扑中的其他线程进行通信,例如... 查看详情
如何在java中重现竞争条件?
】如何在java中重现竞争条件?【英文标题】:Howtoreproduceraceconditioninjava?【发布时间】:2014-05-1516:05:34【问题描述】:我试图创建这样的竞争条件。classBankaccountprivateintbalance=101;publicintgetBalance()returnbalance;publicvoidwithdraw(inti)balance=b... 查看详情
goroutine 竞争条件解决方案
】goroutine竞争条件解决方案【英文标题】:goroutineraceconditionsolution【发布时间】:2019-08-2006:36:08【问题描述】:我正在尝试了解如何解决以下代码的这种竞争条件。sayHello:=func()fmt.Println("Hellofromgoroutine")gosayHello()time.Sleep(1)fmt.Printl... 查看详情
Slick Code 中的竞争条件
】SlickCode中的竞争条件【英文标题】:RaceconditioninSlickCode【发布时间】:2015-06-2807:46:46【问题描述】:我已经在specs2中编写了这个精巧的DAO及其单元测试。我的代码有竞争条件。当我运行相同的测试时,我得到不同的输出。... 查看详情
Java MySQL 防止竞争条件
】JavaMySQL防止竞争条件【英文标题】:JavaMySQLpreventracecondition【发布时间】:2016-03-2122:01:27【问题描述】:我编写了一个java应用程序,它启动异步线程以从同一个数据库读取和更新值。每个线程都从连接池(c3p0)中获取连接。我必... 查看详情