java无锁队列disruptor,内存队列的生产解决方案(代码片段)

BBinChina BBinChina     2022-12-30     108

关键词:

背景

Disruptor是英国外汇交易所LMAX开源的用于生产交易中的内存队列。

为了实现高性能交易撮合队列时,现在普遍的交易撮合引擎都采用了内存队列的方式,这种方式减少了持久化过程中带来的磁盘IO延迟,可以提交整体的交易性能。

Disruptor便是这样场景中诞生的,在实际使用过程中,LMAX基于Disruptor开发的系统单线程能支撑每秒600万订单。

Java队列

内存队列通常用于内存共享场景下的,而共享内存与锁有必然分不开的情缘,在java内置的线程安全的队列有以下:

队列有界性数据结构
ArrayBlockingQueuebounded加锁arraylist
LinkedBlockingQueueoptionally-bounded加锁linkedlist
ConcurrentLinkedQueueunbounded无锁linkedlist
LinkedTransferQueueunbounded无锁linkedlist
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap

ConcurrentLinkedQueue以及LinkedTransferQueue的无锁都是通过原子变量compare and swap(以下简称“CAS”)操作的。

为什么要无锁

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
机器环境:2.4G 6核
运算: 64位的计数器累加5亿次
其测试结果:

MethodTime (ms)
Single thread300
Single thread with CAS5,700
Single thread with lock10,000
Single thread with volatile write4,700
Two threads with CAS30,000
Two threads with lock224,000

实际开发过的应该也知道这个结论:不加锁是最快的。

虽然知道无锁最快,但是我们核心还是要解决地是共享数据多线程安全访问呀(单线程能处理那么单线程最好了,不如actor模型,每个actor就是一个独占线程,反正永不停歇,那就没必要线程切换,保证效能)。

下面是ArrayBlockingQueue通过加锁的方式实现的offer方法,保证线程安全。

public boolean offer(E e) 
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try 
        if (count == items.length)
            return false;
        else 
            insert(e);
            return true;
        
     finally 
        lock.unlock();
    

下面是AtomicInteger的getAndAdd方法。CAS是CPU的一个指令,由CPU保证原子性。

CAS

/**
 * Atomically adds the given value to the current value.
 *
 * @param delta the value to add
 * @return the previous value
 */
public final int getAndAdd(int delta) 
    for (;;) 
        int current = get();
        int next = current + delta;
        if (compareAndSet(current, next))
            return current;
    

  
/**
 * Atomically sets the value to the given updated value
 * if the current value @code == the expected value.
 *
 * @param expect the expected value
 * @param update the new value
 * @return true if successful. False return indicates that
 * the actual value was not equal to the expected value.
 */
public final boolean compareAndSet(int expect, int update) 
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
 

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  1. 环形数组结构
    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。
  2. 元素位置定位
    数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
  3. 无锁设计
    每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

Demo

package com.meituan.Disruptor;

/**
 * @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
 */
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;


public class DisruptorMain

    public static void main(String[] args) throws Exception
    
        // 队列中的元素
        class Element 

            private int value;

            public int get()
                return value;
            

            public void set(int value)
                this.value= value;
            

        

        // 生产者的线程工厂
        ThreadFactory threadFactory = new ThreadFactory()
            @Override
            public Thread newThread(Runnable r) 
                return new Thread(r, "simpleThread");
            
        ;

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = new EventFactory<Element>() 
            @Override
            public Element newInstance() 
                return new Element();
            
        ;

        // 处理Event的handler
        EventHandler<Element> handler = new EventHandler<Element>()
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch)
            
                System.out.println("Element: " + element.get());
            
        ;

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        // 设置EventHandler
        disruptor.handleEventsWith(handler);

        // 启动disruptor的线程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++)
        
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();  
            try
            
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence); 
                // 设置该位置元素的值
                event.set(l); 
            
            finally
            
                ringBuffer.publish(sequence);
            
            Thread.sleep(10);
        
    

深入理解disruptor(代码片段)

Disruptor通过缓存行填充,利用CPU高速缓存,只是Disruptor“快”的一个因素,快的另一因素是“无锁”,尽可能发挥CPU本身的高速处理性能。1缓慢的锁Disruptor作为一个高性能的生产者-消费者队列系统,核心就是... 查看详情

springbootdisruptor构建高性能内存队列(代码片段)

...比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。Disruptor简单使用github地址:https://githu 查看详情

高性能队列——disruptor(代码片段)

文章目录声明:高性能队列——Disruptor背景Java内置队列ArrayBlockingQueue的问题加锁伪共享Disruptor的设计方案一个生产者多个生产者读数据写数据总结性能等待策略生产者的等待策略消费者的等待策略Log4j2应用场景性能差异参考... 查看详情

聊一聊disruptor-无锁并发框架

参考技术ADisruptor论文中讲述一个实验,一个计数器循环自增5亿次CAS:CompareAndSwap/Set顾名思义比较和交换CPU级别的指令,cpu去更新一个值,但如果跟新过程中值发生了变化,操作就失败,然后重试,直到更新成功!Disruptor的sequence... 查看详情

springbootdisruptor高性能内存消息队列(代码片段)

一、背景工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。二、Disruptor介绍Disruptor是英国外汇交易公司LMAX开发的一个高性能队... 查看详情

单机最快的队列disruptor解析和使用(代码片段)

前言介绍高性能队列Disruptor原理以及使用例子。Disruptor是什么?Disruptor是外汇和加密货币交易所运营商LMAXgroup建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标JDK中... 查看详情

并发无锁队列学习(单生产者单消费者模型)

1、引言本文介绍单生产者单消费者模型的队列。依据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种。单生产者单消费者模型的队列操作过程是不须要进行加锁的。生产者通... 查看详情

disruptor快速入门

...的效率通常较低.有没有使用CAS机制实现的生产者-消费者?Disruptor就是这样.disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取;在无锁的情况下,实现queue(环形,RingBuffer)的并发操作,性能远高于Blockin 查看详情

C中的任何单消费者单生产者无锁队列实现?

】C中的任何单消费者单生产者无锁队列实现?【英文标题】:Anysingle-consumersingle-producerlockfreequeueimplementationinC?【发布时间】:2009-06-1312:57:27【问题描述】:我正在写一个带有消费者线程和生产者线程的程序,现在看来队列同步... 查看详情

并发无锁队列

1、前言    队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。结合自己在工... 查看详情

lockfreequeue无锁队列实现与总结(代码片段)

无锁队列介绍  在工程上,为了解决两个处理器交互速度不一致的问题,我们使用队列作为缓存,生产者将数据放入队列,消费者从队列中取出数据。这个时候就会出现四种情况,单生产者单消费者,多生产者单消费者,单生... 查看详情

c++11并发队列的生产方案blockingconcurrentqueue(代码片段)

...oncurrentQueue这是我在生产中使用到的一个c++11实现的无锁队列,有以下特性:1、线程安全的无锁队列2、支持c++11的move语义,优化对象拷贝性能3、模板化4、可预分配内存、也可动态分配5、支持批量处理6、... 查看详情

disruptor使用(代码片段)

Disruptor作者,介绍Disruptor能每秒处理600万订单。这是一个可怕的数字。disruptor之所以那么快,是因为内部采用环形队列和无锁设计。使用cas来进行并发控制。通过获取可用下标来对事件发布和消费下标通过cas控制(Atomic)disruptor组... 查看详情

提升--20---disruptor-----号称最快的消息队列(代码片段)

文章目录Disruptor介绍:==单机最快的MQ。性能非常的高==特性:Disruptor简单理解:==就是内存里用于存放元素的一个高效率的队列==资料:观察者模式:Disruptor核心与特点:Disruptor的核心是一个环形的buffer。线性表--0... 查看详情

disruptor源码分析

本文将介绍Disruptor的工作机制,并分析Disruptor的主要源码基于的版本是3.3.7(发布于2017.09.28)水平有限,如有谬误请留言指正 0. 什么是Disruptor?Disruptor是一个开源的并发框架,提供了类似于Java中有界队列的功能,主要用... 查看详情

springbootdisruptor高性能内存消息队列(代码片段)

一、背景工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。二、Disruptor介绍Disruptor是英国外汇交易公司LMAX开发的一个高性能队... 查看详情

cas无锁队列(代码片段)

    队列是常用的数据结构,采用的FIFO(firstinfirstout)原则,新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取。在计算中队列一般用来做排队(如线程池的等待排队,锁的等待排... 查看详情

springboot+disruptor=王炸!!

...众号,Java干货及时送达01、背景工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka,也不是rabbitmq;Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录.02、Disruptor介绍Disruptor是英国外汇交易公司... 查看详情