多线程环境下队列操作之锁的教训

拙园 拙园     2022-10-04     257

关键词:

之前一直在研究多线程环境下的编程方法,却很少实战体验,以至于我一提到多线程编程,我总是信心不足,又总是说不出到底哪里不明白。今天工程现场反馈了一个“老问题”,我一直担心的是DAServer的运行机制有什么我不明白的地方,DAS Toolkit中总有一部分是我没有仔细研究的,在我心中有阴影,所以工程出了问题我第一反应就是“会不会问题出在阴影里?”。结果,至今为止,我总结起来问题90%都是在自己编码部分。

  我的DAServer中有一个需求,就是上送某个定点数据的速度不能太快,否则后台接收不过来。于是,我设计了一个类,包含了两条队列,其中一条队列是需要上送的数据,另一条队列是历史上曾经更新过的点记录及上送时间。每次组装报文时需要检查队列中是否有某个点,以及该点是否在1.5秒内“曾经上送过”。如果上送过就等下次再来检查,如果没有上送过则上送此信息并记录一下上送时间。

  我在设计这个类时一直在关注逻辑,却忽略了这是一个多线程环境的应用——装入队列数据的线程和取出队列数据的线程是不同的。好吧,只能说明我们的应用场景太低端,多线程争用资源的场景不是太频繁,以至于我过了这么久才明白过来。这个类的设计代码如下:

技术分享图片
技术分享图片
 1 struct SUpdateValue
 2 {
 3     int m_dataID;            // 数据点号
 4     DASVariant m_dataValue;    // 数据值
 5     SUpdateValue* next;
 6 
 7     SUpdateValue() : m_dataID(0), next(NULL){ }
 8     SUpdateValue(int id, DASVariant& val) : m_dataID(id), m_dataValue(val), next(NULL){    }
 9 };
10 
11 struct SUpdateTime
12 {
13     int dataID;            // 数据点号
14     long lastSendTime;    // 上次上送缓冲数据的时间戳(毫秒)
15     SUpdateTime* next;
16 
17     SUpdateTime() : dataID(0), lastSendTime(0), next(NULL){}
18 };
19 
20 class CDelayQueue
21 {
22 public:
23     CDelayQueue();
24     ~CDelayQueue();
25     bool EnqueFrame(int dataID, DASVariant& dataValue);        // 向延迟队列中装入新数据
26     bool DequeFrame(int dataID, DASVariant& dataValue);        // 从延迟队列中取出指定的数据
27     void DelayUpdateTime(int dataID);    // 设定数据点的更新时间戳,一般用于防止过快上送
28     int GetFrameCount(void);            // 获取延迟队列中的对象个数
29     string GetFrameList(void);            // 获取对象名称列表,以逗号分隔
30 
31 protected:
32     bool AskForUpdate(int dataID);        // 申请上送新数据
33 
34 private:
35     SUpdateValue* m_pHead;        // 需要延迟发送的数据队列
36     SUpdateValue* m_pTail;
37     SUpdateTime* m_pTimeHead;    // 已经发送的数据队列的历史记录(超时后失去记录)
38 };
技术分享图片

  实现部分如下:

技术分享图片
技术分享图片
  1 CDelayQueue::CDelayQueue() : m_pHead(NULL), m_pTail(NULL), m_pTimeHead(NULL)
  2 {
  3 }
  4 
  5 CDelayQueue::~CDelayQueue()
  6 {
  7     while (NULL != m_pHead)
  8     {
  9         SUpdateValue* p = m_pHead;
 10         m_pHead = m_pHead->next;
 11         delete p;
 12     }
 13 
 14     while (NULL != m_pTimeHead)
 15     {
 16         SUpdateTime* p = m_pTimeHead;
 17         m_pTimeHead = m_pTimeHead->next;
 18         delete p;
 19     }
 20 }
 21 
 22 bool CDelayQueue::EnqueFrame(int dataID, DASVariant& dataValue)
 23 {
 24     SUpdateValue* pNew = new SUpdateValue(dataID, dataValue);
 25     if (NULL == pNew)
 26     {
 27         return false;
 28     }
 29 
 30     if (NULL == m_pHead)
 31     {
 32         m_pHead = m_pTail = pNew;
 33     }
 34     else
 35     {
 36         m_pTail->next = pNew;
 37         m_pTail = m_pTail->next;
 38     }
 39 
 40     return true;
 41 }
 42 
 43 bool CDelayQueue::DequeFrame(int dataID, DASVariant& dataValue)
 44 {
 45     if (NULL == m_pHead)
 46     {
 47         return false;
 48     }
 49 
 50     // 检查队列中是否存在该点
 51     if (m_pHead->m_dataID == dataID)
 52     {
 53         if (AskForUpdate(dataID))
 54         {
 55             dataValue = m_pHead->m_dataValue;
 56             SUpdateValue* pDel = m_pHead;
 57             m_pHead = m_pHead->next;
 58             delete pDel;
 59             return true;
 60         }
 61         return false;
 62     }
 63     
 64     SUpdateValue* pPre = m_pHead;
 65     SUpdateValue* pValue = m_pHead->next;
 66     while (pValue != NULL)
 67     {
 68         if (pValue->m_dataID == dataID)
 69         {
 70             if (AskForUpdate(pValue->m_dataID))
 71             {
 72                 dataValue = pValue->m_dataValue;
 73                 pPre->next = pValue->next;
 74                 delete pValue;
 75                 return true;
 76             }
 77             return false;
 78         }
 79         pPre = pValue;
 80         pValue = pPre->next;
 81     }
 82 
 83     return false;
 84 }
 85 
 86 bool CDelayQueue::AskForUpdate(int dataID)
 87 {
 88     long curTime = GetTickCount();
 89 
 90     // 检查是否在短时间内更新过该数据点
 91     SUpdateTime* pList = m_pTimeHead;
 92     while (pList)
 93     {
 94         if (pList->dataID == dataID)
 95         {
 96             if ((curTime - pList->lastSendTime) < 1500)        // xiaoku
 97             {
 98                 return false;
 99             }
100             pList->lastSendTime = curTime;
101             return true;
102         }
103         pList = pList->next;
104     }
105 
106     // 如果记录中没有目标点,则创建历史记录
107     if (NULL == pList)
108     {
109         pList = new SUpdateTime();
110         pList->dataID = dataID;
111         pList->lastSendTime = curTime;
112         pList->next = m_pTimeHead;
113         m_pTimeHead = pList;
114     }
115 
116     return true;
117 }
118 
119 void CDelayQueue::DelayUpdateTime(int dataID)
120 {
121     long curTime = ::GetTickCount();
122     SUpdateTime* pList = m_pTimeHead;
123     while (pList)
124     {
125         if (pList->dataID == dataID)
126         {
127             pList->lastSendTime = curTime - 500;
128             return ;
129         }
130         pList = pList->next;
131     }
132 
133     // 如果记录中没有目标点,则创建历史记录
134     if (NULL == pList)
135     {
136         pList = new SUpdateTime();
137         pList->dataID = dataID;
138         pList->lastSendTime = curTime - 500;
139         pList->next = m_pTimeHead;
140         m_pTimeHead = pList;
141     }
142 }
143 
144 int CDelayQueue::GetFrameCount()
145 {
146     if (NULL == m_pHead)
147     {
148         return 0;
149     }
150 
151     int count = 1;
152     SUpdateValue* p = m_pHead;
153     while(p != m_pTail)
154     {
155         ++count;
156         p = p->next;
157     }
158     return count;
159 }
160 
161 string CDelayQueue::GetFrameList()
162 {
163     string strNameList("");
164     if (NULL == m_pHead)
165     {
166         return strNameList;
167     }
168     SUpdateValue* p = m_pHead;
169     char szData[16];
170     sprintf_s(szData, 16, "%d", p->m_dataID);
171     strNameList += szData;
172     while(p != m_pTail)
173     {
174         sprintf_s(szData, 16, "%d", p->next->m_dataID);
175         strNameList += ";";
176         strNameList += szData;
177         p = p->next;
178     }
179 
180     return strNameList;
181 }
技术分享图片

  最关键两个接口是EnqueFrame() 和 DequeFrame() ,分别是装入队列和从队列中取数。都是在操作队列,如果是不同的线程,怎么能不考虑线程互斥的问题呢?好吧,迅速引入LockerGuard。

  这个失败的例子放在这里警示一下自己!

多线程之锁机制(代码片段)

前言  在Java并发编程实战,会经常遇到多个线程访问同一个资源的情况,这个时候就需要维护数据的一致性,否则会出现各种数据错误,其中一种同步方式就是利用Synchronized关键字执行锁机制,锁机制是先给共享资源上锁,... 查看详情

java浅谈线程安全之锁(代码片段)

...概念。1、本地锁:在单进程的系统中,存在多个线程去同时操作某个共享变量时,就需要使用本地锁,最常用的关键字:synchronized2、分布式锁:在分布式系统中,我们知道会有多台服务器同时部署一... 查看详情

java并发之锁的使用浅析

    锁像synchronized同步块一样,是一种线程同步机制。让自Java5开始,java.util.concurrent.locks包提供了另一种方式实现线程同步机制——Lock。那么问题来了既然都可以通过synchronized来实现同步访问了,那么为什... 查看详情

数据库之锁的概念和显示锁的使用

...中就会产生多个事务同时存取同一数据的情况。若对并发操作不加控制就可能会读取和存储 查看详情

计算机基础之锁的分类

参考技术A避免多个线程同时读写同一个数据而产生不可预料的后果,我们需要将各个线程对同一个数据的访问同步。所谓同步,既是指在一个线程访问数据未结束时,其他线程不得对同一个数据进行访问。同步的最常见方法是... 查看详情

利用双缓冲队列来减少锁的竞争

...可少的。但是我们也清楚对同一个文本进行写日志只能单线程的去写,那么我们也经常会使用简单lock锁来保证只有一个线程来写入日志信息。但是在多线程的去写日志信息的时候,由于记录日志信息是需要进行I/O交互的,导致... 查看详情

分布式环境下的并发编程(代码片段)

在JAVA多线程编程中,经常会用到synchronized、lock和原子变量等,分布式系统中,由于分布式系统的分布性,即多线程和多进程并且分布在不同机器中,synchronized和lock这两种锁将失去原有锁的效果,需要我们自己实现分布式锁来处... 查看详情

多线程环境下,程序真是危机四伏(代码片段)

姿势在不断的更新迭代,太卷了。你管这也叫线程安全?    最近大意了,竟然想将《面试官:实现一个带值变更通知能力的Dictionary》一文中的临界锁只应用到写操作。内心旁白:读操作又不会修改数据ÿ... 查看详情

java之锁-volatile

锁是JAVA多线程关键,也是面试中必问的,在此好好总结一下。(先要从进程和线程说起,此处先欠下,回头专门说一下操作系统是怎么管理进程和线程的)说到多线程就要说说JAVA的内存模型:图片来自于网络。  Java内存模型... 查看详情

多进程多线程在不同环境下的操作(代码片段)

多进程、多线程在不同环境下的操作多进程:Linux创建进程是操作系统把父进程的东西拷贝到子进程    Windows创建进程类似于模块导入  Linux环境下开启多进程,可以用os里的fork1importos2importtime3pid=os.fork()45ifpid==0:#子进程永... 查看详情

多线程编程之原子锁

 在《多线程编程之数据访问互斥》一文中简单介绍了原子锁,这里再详细说一下原子锁的概念和用途。(1)简单数据操作  如果在一个多线程环境下对某个变量进行简单数学运算或者逻辑运算,那么就应该使用原子锁操作... 查看详情

java多线程与并发模型之锁

这是一篇总结Java多线程开发的长文。文章是从Java创建之初就存在的synchronized关键字引入,对Java多线程和并发模型进行了探讨。希望通过此篇内容的解读能帮助Java开发者更好的理清Java并发编程的脉络。互联网上充斥着对Java多线... 查看详情

多线程编程之读写锁

 在《多线程编程之Linux环境下的多线程(二)》一文中提到了Linux环境下的多线程同步机制之一的读写锁。本文再详细写一下读写锁的概念和原理。一、什么是读写锁  读写锁(也叫共享-独占锁)实际是一种特殊的自旋锁,... 查看详情

线程同步互斥锁和读写锁的区别和各自适用场景

...等待,唤醒时优先考虑写者)互斥锁特点:一次只能一个线程拥有互斥锁,其他线程只有等待自旋锁:一次只能有一个进程进入临界区,读写锁是自旋锁的一个特例。应用场景:以队列操作为例:线程A对队列负责将数据写入队... 查看详情

第七天学习多线程同步和锁(代码片段)

多线程的线程同步和锁线程同步当多个线程访问同一个对象,并且线程还想修改对象,这时候就需要线程同步,线程同步其实就是一个等待机制,多个需要访问此对象的线程进入对象的等待池形成队列,等待前一个线程使用完毕... 查看详情

ios底层探索之多线程(十三)—锁的种类你知多少?(代码片段)

...客开始将对锁的相关内容进行分析!iOS底层探索之多线程(一)—进程和线程iOS底层探索之多线程(二)—线程和锁iOS底层探索之多线程(三)—初识GCDiOS底层探索之多线程(四)—GCD的队列iOS底层探索之多线程(五)—GCD不同队列源码分... 查看详情

收藏已修改;枚举操作可能无法执行(带锁的多线程)

】收藏已修改;枚举操作可能无法执行(带锁的多线程)【英文标题】:Collectionwasmodified;enumerationoperationmaynotexecute(MultiThreadingwithLocks)【发布时间】:2021-06-1002:19:54【问题描述】:所以我知道这个问题以前在这里被问过,但这里... 查看详情

平时不用面试又爱问系列之锁优化问题(代码片段)

...,否则表示加锁失败(更新的内存区域可以想象成哪个线程持有了这个锁)࿰ 查看详情