优化技术专题「线程间的高性能消息框架」终极关注disruptor的核心源码和java8的@contended伪共享指南(代码片段)

洛神灬殇 洛神灬殇     2023-01-12     387

关键词:

Disruptor原理分析

Disruptor关联好任务处理事件后,就调用了disruptor.start() 方法,可以看出在调用了 start() 方法后,消费者线程就已经开启。

启动Disruptor

start() ->开启 Disruptor,运行事件处理器。

public RingBuffer<T> start()
        checkOnlyStartedOnce();
        //在前面 handleEventsWith() 方法里添加的 handler 对象会加入到 consumerRepository 里,这里遍历 consumerRepository 开启消费者线程
        for (final ConsumerInfo consumerInfo : consumerRepository)
            //从线程池中获取一个线程来开启消费事件处理器。(消费者开启监听,一旦有生产者投递,即可消费)
            //这里开启的线程对象为BatchEventProcessor的实例
            consumerInfo.start(executor)
        return ringBuffer。

关联事件

handleEventsWith() -> createEventProcessors()调用的核心方法,作用是创建事件处理器。

@SafeVarargs
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
        return createEventProcessors(new Sequence[0], handlers);

存储事件

将EventHandler对象绑定存储到consumerRepository内部,并且交由BatchEventProcessor处理器进行代理执行。

EventHandlerGroup<T> createEventProcessors(
        final Sequence[] barrierSequences,
        final EventHandler<? super T>[] eventHandlers)
        ...
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        //创建 sequence 序号栅栏
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)for (int i = 0, eventHandlersLength = eventHandlers.length。i < eventHandlersLength。i++)
            final EventHandler<? super T> eventHandler = eventHandlers[i];
            final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)...
            //这里将消费者加入到 consumerRepository 中---ConsumerRepository
            consumerRepository.add(batchEventProcessor, eventHandler, barrier)。
            processorSequences[i] = batchEventProcessor.getSequence()
        ...
    
  • handleEventsWith() 方法中,可以看到构建了一个 BatchEventProcessor(继承了 Runnable 接口)对象,start()方法启动的同样也是这个对象的实例。

  • 这个对象继承自 EventProcessor ,EventProcessor 是 Disruptor 里非常核心的一个接口,它的实现类的作用是轮询接收RingBuffer提供的事件,并在没有可处理事件是实现等待策略。

  • 这个接口的实现类必须要关联一个线程去执行,通常我们不需要自己去实现它。

BatchEventProcessor类

BatchEventProcessor:主要事件循环,处理 Disruptor 中的 event,拥有消费者的 Sequence。

核心私有成员变量
  • Sequence :维护当前消费者消费的 ID。

  • SequenceBarrier :序号屏障,协调消费者的消费 ID,主要作用是获取消费者的可用序号,并提供等待策略的执行。

  • EventHandler<? super T> :消费者的消费逻辑(我们实现的业务逻辑)。

  • DataProvider :获取消费对象。RingBuffer 实现了此接口,主要是提供业务对象。

核心方法
  • processEvents():由于 BatchEventProcessor 继承自 Runnable 接口,所以在前面启动它后,run() 方法会执行,而 run() 方法内部则会调用此方法。
private void processEvents()
    
        T event = null。
        获取当前消费者维护的序号中并+1,即下一个消费序号
        long nextSequence = sequence.get() + 1Lwhile (true) 
            try 
                //获取可执行的最大的任务 ID,如果没有,waitFor() 方法内会进行等待
                final long availableSequence = sequenceBarrier.waitFor(nextSequence)if (batchStartAware != null && availableSequence >= nextSequence) 
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1)
                //不断获取对应位置的任务进行消费 直到上面查询到的 availableSequence 消费完
                while (nextSequence <= availableSequence) 
                    event = dataProvider.get(nextSequence)。
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。
                    nextSequence++
                sequence.set(availableSequence)
            ...
        
 
  • 消费者事件处理器的核心代码,sequenceBarrier.waitFor(nextSequence) 方法内部,会比较当前消费者序号与可用序号的大小:

    • 当可用序号(availableSequence)大于当前消费者序号(nextSequence),再获取到当前可用的最大的事件序号 ID(waitFot()方法内部调用 sequencer.getHighestPublishedSequence(sequence, availableSequence)),进行循环消费。
    • 可用序号是维护在 ProcessingSequenceBarrier 里的,ProcessingSequenceBarrier 是通过 ringBuffer.newBarrier() 创建出来的。

由图可以看出,在获得可用序号时,SequenceBarrier 在 EventProcessor 和 RingBuffer中充当协调的角色。

多消费事件和单消费事件在dependentSequence 上的处理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的赋值以及 get() 方法 (Util.getMinimumSequence(sequences))。

启动过程分析之生产者

首先调用了 ringBuffer.next() 方法,获取可用序号,再获取到该序号下事先通过 Eventfactory 创建好的空事件对象,在我们对空事件对象进行赋值后,再调用 publish 方法将事件发布,则消费者就可以获取进行消费了。

生产者这里的核心代码如下,这里我截取的是多生产者模式下的代码:

    public long next(int n)
        if (n < 1 || n > bufferSize) 
            throw new IllegalArgumentException("n must be > 0 and < bufferSize")
        long current。
        long next。
        do
            //cursor 为生产者维护的 sequence 序列,获取到当前可投递的的下标,即当前投递到该位置
            current = cursor.get()//再+n获取下一个下标,即下一次投递的位置。
            next = current + n。
            long wrapPoint = next - bufferSize。
            //目的:也是实现快读的读写。gatingSequenceCache独占缓存行
            long cachedGatingSequence = gatingSequenceCache.get()if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
                //获取消费者最小序号
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current)if (wrapPoint > gatingSequence) 
                    //如果不符合,则阻塞线程 1ns(park()不会有死锁的问题)
                    LockSupport.parkNanos(1)// TODO, should we spin based on the wait strategy?
                    continue
                gatingSequenceCache.set(gatingSequence)
            //多个生产者时要保证线程安全(这里更新的 cursor 同时也是等待策略里的 waitFor() 方法的 cursor 参数,因此这里更新成功后,则等待策略会通过,表示有新的任务进来,就会消费)
            else if (cursor.compareAndSet(current, next))
                break
        while (true);
        return next。
    

cursor 对象和 Util.getMinimumSequence(gatingSequences, current) 方法,cursor 对象是生产者维护的一个生产者序号,标示当前生产者已经生产到哪一个位置以及下一个位置,它是 Sequence 类的一个实例化对象

  • 从图里可以看出,Sequence 继承以及间接继承了 RhsPadding 和 LhsPadding 类,而这俩个类都各定义了 7 个 long 类型的成员变量。

  • 而 Sequence 的 get() 方法返回的也是一个 long 类型的值 value。这是上一篇文章介绍的充缓存行,消除伪共享。

  • 在 64 位的计算机中,单个缓存行一般占 64 个字节,当 cpu 从换存里取数据时,会将该相关数据的其它数据取出来填满一个缓存行,这时如果其它数据更新,则缓存行缓存的该数据也会失效,当下次需要使用该数据时又需要重新从内存中提取数据。

  • ArrayBlockingQueue 获取数据时,很容易碰到伪共享导致缓存行失效,而 Disruptor这里当在 value 的左右各填充 7 个 long 类型的数据时,每次取都能确保该数据独占缓存行,也不会有其他的数据更新导致该数据失效。避免了伪共享的问题( jdk 的并发包下也有一些消除伪共享的设计)。


RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。第一张图里面创建 Disruptor 的多个参数其实都是用来创建 RingBuffer 的,比如生产者类型(单 or 多)、实例化工厂、容器长度、等待策略等。

简单分析,多个生产者同时向 ringbuffer 投递数据,假设此时俩个生产者将 ringbuffer 已经填满,因为 sequence 的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个 sequence 即可。

当多消费者来消费的时候,因为消费速度不同,例如消费者 1 来消费 0、1,消费者 2 消费 2、4,消费者 3 消费 3。

当消费者消费完 0 后,消费者 2 消费完 2 后,消费者 3 消费完 3 后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第 0 个位置, 此时再想投递数据时,虽然消费 2 的第二个位置空缺、消费者 3 的第三个位置空缺,消费者还在消费 1 时,无法继续投递。因为是通过比较消费者自身维护的 sequence 的最小的序号,来进行比较。

Util.getMinimumSequence(gatingSequences, current) 方法也就无需再多说,它就是为了获取到多个消费者的最小序号,判断当前 ringBuffer 中的剩余可用序号是否大于消费者最小序号,是的话,则不能投递,需要阻塞当前线程(LockSupport.parkNanos(1))。

当消费者消费速度大于生产者生产者速度,生产者还未来得及往队列写入,或者生产者生产速度大于消费者消费速度,此时怎么办呢?而且上面也多次提到没有满足条件的消费事件时,消费者会等待,接下来说一下消费者的等待策略。

个人常用的策略:

  • BlockingWaitStrategy 使用了锁,低效的策略。

  • SleepingWaitStrategy 对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)

  • YieldingWaitStrategy 性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于 cpu 逻辑核心数的场景中,推荐使用。

@Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
        long availableSequence。
        int counter = SPIN_TRIES//100
        while ((availableSequence = dependentSequence.get()) < sequence)
            counter = applyWaitMethod(barrier, counter)
        return availableSequence。
    
    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    
        barrier.checkAlert()if (0 == counter)
        
            Thread.yield()
        else
        
            --counter。
        
        return counter。
    

Java 8 Contended注解

  • 在Java 8中,可以采用@Contended在类级别上的注释,来进行缓存行填充。这样,可以解决多线程情况下的伪共享冲突问题。

  • Contended可以用于类级别的修饰,同时也可以用于字段级别的修饰,当应用于字段级别时,被注释的字段将和其他字段隔离开来,会被加载在独立的缓存行上。在字段级别上,@Contended还支持一个“contention group”属性(Class-Level不支持),同一group的字段们在内存上将是连续(64字节范围内),但和其他他字段隔离开来。

@Contended注释的行为如下所示:

在类上应用Contended:

@Contended
    public static class ContendedTest2 
        private Object plainField1;
        private Object plainField2;
        private Object plainField3;
        private Object plainField4;
    

将使整个字段块的两端都被填充:(以下是使用 –XX:+PrintFieldLayout的输出)

TestContended$ContendedTest2: field layout
    Entire class is marked contended
     @140 --- instance fields start ---
     @140 "plainField1" Ljava.lang.Object;
     @144 "plainField2" Ljava.lang.Object;
     @148 "plainField3" Ljava.lang.Object;
     @152 "plainField4" Ljava.lang.Object;
     @288 --- instance fields end ---
     @288 --- instance ends ---

注意,我们使用了128 bytes的填充 – 2倍于大多数硬件缓存行的大小(cache line一般为64 bytes) – 来避免相邻扇区预取导致的伪共享冲突。

在字段上应用Contended:

public static class ContendedTest1 
        @Contended
        private Object contendedField1;
        private Object plainField1;
        private Object plainField2;
        private Object plainField3;
        private Object plainField4;
    

将导致该字段从连续的字段块中分离开来并高效的添加填充:

TestContended$ContendedTest1: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField1" Ljava.lang.Object;
     @ 16 "plainField2" Ljava.lang.Object;
     @ 20 "plainField3" Ljava.lang.Object;
     @ 24 "plainField4" Ljava.lang.Object;
     @156 "contendedField1" Ljava.lang.Object; (contended, group = 0)
     @288 --- instance fields end ---
     @288 --- instance ends ---

注解多个字段使他们分别被填充:

public static class ContendedTest4 
        @Contended
        private Object contendedField1;
        @Contended
        private Object contendedField2;
        private Object plainField3;
        private Object plainField4;
    

被注解的2个字段都被独立地填充:

TestContended$ContendedTest4: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField3" Ljava.lang.Object;
     @ 16 "plainField4" Ljava.lang.Object;
     @148 "contendedField1" Ljava.lang.Object; (contended, group = 0)
     @280 "contendedField2" Ljava.lang.Object; (contended, group = 0)
     @416 --- instance fields end ---
     @416 --- instance ends ---

在有些cases中,你会想对字段进行分组,同一组的字段会和其他字段有访问冲突,但是和同一组的没有。例如,(同一个线程的)代码同时更新2个字段是很常见的情况。

public static class ContendedTest5 
        @Contended("updater1")
        private Object contendedField1;

        @Contended("updater1")
        private Object contendedField2;

        @Contended("updater2")
        private Object contendedField3;

        private Object plainField5;
        private Object plainField6;
    

内存布局是:

TestContended$ContendedTest5: field layout
     @ 12 --- instance fields start ---
     @ 12 "plainField5" Ljava.lang.Object;
     @ 16 "plainField6" Ljava.lang.Object;
     @148 "contendedField1" Ljava.lang.Object; (contended, group = 12)
     @152 "contendedField2" Ljava.lang.Object; (contended, group = 12)
     @284 "contendedField3" Ljava.lang.Object; (contended, group = 15)
     @416 --- instance fields end ---
     @416 --- instance ends ---

@Contended在字段级别,并且带分组的情况下,是否能解决伪缓存问题。

import sun.misc.Contended;
public class VolatileLong 
    @Contended("group0")
    public volatile long value1 = 0L;
    @Contended("group0")
    public volatile long value2 = 0L;
    @Contended("group1")
    public volatile long value3 = 0L;  
    @Contended("group1")
    public volatile long value4 = 0L;  

用2个线程来修改字段

  • 测试1:线程0修改value1和value2;线程1修改value3和value4;他们都在同一组中。

  • 测试2:线程0修改value1和value3;线程1修改value2和value4;他们在不同组中。

测试1

public final class FalseSharing implements Runnable 
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private static Volatile Long volatileLong;
    private String groupId;
    public FalseSharing(String groupId) 
        this.groupId = groupId;
    
    public static void main(final String[] args) throws Exception 
        // Thread.sleep(10000);
        System.out.println("starting....");
        volatileLong = new VolatileLong();
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    

    private static void runTest() throws InterruptedException 
        Thread t0 = new Thread(new FalseSharing("t0"));
        Thread t1 = new Thread(new FalseSharing("t1"));
        t0.start();
        t1.start();
        t0.join();
        t1.join();
    
    public void run() 
        long i = ITERATIONS + 1;
        if (groupId.equals("t0")) 
            while (0 != --i) 
                volatileLong.value1 = i;
                volatileLong.value2 = i;
            
         else if (groupId.equals("t1")) 
            while (0 != --i) 
                volatileLong.value3 = i;
                volatileLong.value4 = i;
            
        
    


public void run() 
        long i = ITERATIONS + 1;
        if (groupId.equals("t0")) 
            while (0 != --i) 
                volatileLong.value1 = i;
                volatileLong.value3 = i;
            
         else if (groupId.equals("t1")) 
            while (0 != --i) 
                volatileLong.value2 = i;
                volatileLong.value4 = i;
            
        
    

优化技术专题「系统性能调优实战」终极关注应用系统性能调优及原理剖析(下册)

前提介绍承接上文:【优化技术专题】「系统性能调优实战」终极关注应用系统性能调优及原理剖析(上册)之后我们接下来进行相关的。流程相关分析优化通过access_log.txt日志分析,在特定时间段内,将请求至系统的url分组计... 查看详情

优化技术专题「系统性能调优实战」终极关注应用系统性能调手册指南(上册)

...升更高的能力和层次,所以便进行先关系统性质的学习和优化相关的技术做了一定的研究。调优背景因为当出现吞吐远远不能够满足我们客户或者我们需要的呼叫了指标的时候因为出现了这么一次情况,虽然没有给用户没有给公... 查看详情

终极版c语言-尹成-专题视频课程

...发检索、文件重定向、多线程同步、进程通讯、黑客劫持技术、网络安全、加密解密,以及各种精彩的小项目等,非常适合大家学习。讲课生动风趣、深入浅出,全套视频内容充实。课程收益    手把手教你从菜鸟到C++语言高... 查看详情

终极版c语言-尹成-专题视频课程

...发检索、文件重定向、多线程同步、进程通讯、黑客劫持技术、网络安全、加密解密,以及各种精彩的小项目等,非常适合大家学习。讲课生动风趣、深入浅出,全套视频内 查看详情

分布式技术专题「系统服务优化系列」web应用服务的性能指标优化开发指南(jvm篇)(代码片段)

JVM优化机制JIT编译器相关的优化对JVM性能影响最大的是编译器,选择编译器是运行java程序首先要做的选择之一。热点编译的概念对于程序来说,通常只有一部分代码被经常执行,这些关键代码被称为应用的热点,... 查看详情

分布式技术专题「系统服务优化系列」web应用服务的性能指标优化开发指南(基础篇)(代码片段)

前提概要针对于Web应用系统是现在计算机领域里面最常见的信息载体了,整个服务并不是一个孤立的个体,一个较为简单的Web应用主要是由前端视图页面、后端系统支撑、应用服务器、负载代理服务器、数据库等等其他... 查看详情

redis实战专题「性能监控系列」全方位探索redis的性能监控以及优化指南

Redis基本简介Redis是一个开源(BSD许可)、内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁... 查看详情

[mui专题]优化标题栏

本文首发于微信公众号:"算法与编程之美",欢迎关注,及时了解更多此系列文章。前言三步搭建MUI页面主框架法包括新建含mui的HTML文件、输入mheader(标题栏)、输入mbody(主体)。一个特色鲜明MUI界面无疑是能够吸引用户的关... 查看详情

unity性能优化专题—腾讯牛人分享经验(难度1推荐3)

...大小、unity怎么节省空间这里从三个纬度来分享下内存的优化经验:代码层面、贴图层面、框架设计层面。一.代码层面。1.foreach。M 查看详情

redis实战专题「性能监控系列」全方位探索redis的性能监控以及优化指南(代码片段)

Redis基本简介Redis是一个开源(BSD许可)、内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合等数据类型。内置复制、Lua脚本、LRU收回、事务以... 查看详情

应用服务性能优化-异步

1同步和异步,阻塞和非阻塞同步和异步关注的是结果消息的通信机制同步:同步的意思就是调用方需要主动等待结果的返回异步:异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回... 查看详情

网站前端性能优化终极指南(代码片段)

性能黄金法则:80-90%的终端用户响应时间花在下载前端,即页面上的所有组件:img、stylesheets、scripts等1.缩小HTML、CSS和JavaScript减少资源意味着从HTML、JavaScript和CSS中删除不必要的、不需要加载的字符,如空白字符、换行字符... 查看详情

网站前端性能优化终极指南(代码片段)

性能黄金法则:80-90%的终端用户响应时间花在下载前端,即页面上的所有组件:img、stylesheets、scripts等1.缩小HTML、CSS和JavaScript减少资源意味着从HTML、JavaScript和CSS中删除不必要的、不需要加载的字符,如空白字符、换行字符... 查看详情

java并发专题之三java线程同步

  从JDK5引入CAS原子操作,但没有对synchronized关键字做优化,而是增加了J.U.C.concurrent,concurrent包有更好的性能;从JDK6对synchronized的实现机制进行了较大调整,包括使用JDK5引进的CAS自旋之外,还增加了自适应的CAS自旋、锁消除... 查看详情

vc++进程间的通信

...使用volatile修饰符,它告诉编译器无需对该变量作任何的优化,即无需将它放到一个寄存器中,并且该值可被外部改变。如果线程间所需传递的信息较复杂,我们可以定义一个结构,通过传递指向该结构的指针进行传递信息。使... 查看详情

easyswoole+elasticsearch打造高性能小视频服务系统

...能测试报告,带你深入理解性能测试的真谛。第4章玩转高性能消息队列服务本章讲解消息队列、学习消息队列的原因、常用消息队列介绍以及使用场景解刨以及利用easySwoole打造高性能消息队列服务。第5章小视频服务平台-前后... 查看详情

ios进阶开发-ios性能优化-关东升-专题视频课程

iOS进阶开发-iOS性能优化—3316人已学习课程介绍        介绍了性能优化方法,其中包括内存优化、资源文件优化、延迟加载、持久化优化、使用可重用对象和并发访问等。这些内容都是非常重要的,... 查看详情

终极版c语言-尹成-专题视频课程

...发检索、文件重定向、多线程同步、进程通讯、黑客劫持技术、网络安全、加密解密,以及各种精彩的小项目等,非常适合大家学习。讲课生动风趣、深入浅出,全套视频内容充实。课程收益    手把手教你从菜鸟... 查看详情