exchanger详解(代码片段)

truestoriesavici01 truestoriesavici01     2022-12-02     528

关键词:

Exchanger详解

简介

当一个线程到达栅栏时,会检查是否有其他线程已经到达栅栏.
若没有,则该线程进入等待.
若有,则与等待的其他线程交换各自的数据,然后继续执行.

原理

  • 内部类Participant继承自ThreadLocal,用来保存线程本地变量Node.
  • Node存储用于单槽交换多槽交换的字段.

单槽位交换(slot exchange)

技术图片

流程:

  • 首先到达的线程:
    • slot字段指向自身的Node节点,表示槽位已被占用.
    • 该线程自旋一段时间.若经过一段时间自旋还是没有配对的线程到达,则进入阻塞.(自旋减少上下文切换的开销)
  • 后续到达的线程:
    • 此时槽位slot已被占用.则后续的线程将槽位slot清空,取出Node中的item作为交换的数据.
    • 后续的线程把自身的数据存入Node中的match字段中,并唤醒先到达的线程.
  • 先到达的线程被唤醒:
    • 检查match是否为空.不为空则退出自旋,将match中的数据返回.

多槽位交换(arena exchange)

触发机制:
在单槽位交换中,若:多个匹配线程竞争修改slot槽位,导致线程CAS修改slot失败,则初始化arena多槽位数组,后续的交换使用多槽位交换.

流程:

  1. 若槽不为空,则已有线程到达并等待.
    • 获取已到达先携带的数据.
    • 将当前线程携带的数据交换给已到达的线程.
    • 唤醒已到达的线程.
  2. 若槽位有效且为空.
    • CAS占用槽位成功.
    • 通过spin->yield->block的锁升级方式进行优化的等待其他线程到达.若有线程到达,则交换数据后返回交换后的数据.
    • 若没有等待配对的线程,则阻塞的线程.
  3. 无效的槽位,需要扩容.
    • 通过CAS方式对数组进行扩容.

注:
数组是连续的内存地址空间.多个slot会被加载到同一个缓存行上.
当一个slot改变时,导致该slot所在的缓存行上所有的数据都无效,需要重新从内存加载.

不同版本的差异

  • JDK5被设计为容量为1的容器,存放一个等待的线程.当另外一个线程到达时,交换数据后会清空容器.
  • JDK6后提供多个slot,增加并发执行的吞吐量.

示例

生产者-消费者模式

示例:

public class ProducerAndConsumer 
    @Test
    public void test() throws InterruptedException 
        Exchanger<Integer> exchanger = new Exchanger<>();
        new Thread(new Producer(exchanger)).start();
        new Thread(new Consumer(exchanger)).start();
        TimeUnit.SECONDS.sleep(20);
    


class Producer implements Runnable 
    private int[] array = new int[5];
    private final Exchanger exchanger;

    public Producer(Exchanger exchanger)
        this.exchanger = exchanger;
    

    @Override
    public void run() 
        for (int i = 0; i < 5; i++) 
            try 
                TimeUnit.SECONDS.sleep(2);
                array[i] = i;
                System.out.println("生产者生产的数据为: " + array[i]);
                int exchange = (int) exchanger.exchange(array[i]);
                System.out.println("生产者交换后的数据为: "+exchange);
                System.out.println("生产者的对应的数据是否改变: " + array[i]);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    


class Consumer implements Runnable
    private int[] array = new int[5];
    private final Exchanger exchanger;

    public Consumer(Exchanger exchanger)
        this.exchanger = exchanger;
    
    @Override
    public void run() 
        int index = 0;
        while (true) 
            try 
                TimeUnit.SECONDS.sleep(1);
                System.out.println("消费者交换前的数据为: " + array[index]);
                int exchange = (int) exchanger.exchange(array[index]);
                index++;
                System.out.println("消费者获得的数据为: " + exchange);
                System.out.println("消费者交换后的数据: " + array[index]);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

注:

  • 交换数据后并不改变线程内部的数据.交换数据在于获取对方的数据.是否替换用来交换局部数据需要自行设定.

参考:






一行java代码实现两玩家交换装备并发编程(代码片段)

文章目录1Exchanger是什么2Exchanger详解3Exchanger应用1Exchanger是什么JDK1.5开始JUC包下提供的Exchanger类可用于两个线程之间交换信息。Exchanger对象可理解为一个包含2个格子的容器,通过调用exchanger方法向其中的格子填充信息,当... 查看详情

一行java代码实现两玩家交换装备并发编程(代码片段)

文章目录1Exchanger是什么2Exchanger详解3Exchanger应用1Exchanger是什么JDK1.5开始JUC包下提供的Exchanger类可用于两个线程之间交换信息。Exchanger对象可理解为一个包含2个格子的容器,通过调用exchanger方法向其中的格子填充信息,当... 查看详情

rabbitmq消息的参数详解(代码片段)

...数设置.发布消息时的完整入参是这样的:channel.BasicPublish(exchange:"test_exchange",routingKey:"",mandatory:false,basicProperties:null,body:Encoding.Default.GetBytes(msg));下面一一解释:exchange:交换机名称routingKey:路由键路由键的设置跟交换机的类型有关.如... 查看详情

amqp协议详解(代码片段)

...Connection(连接)Channel(信道)Broker(中间件)VirtualHost(虚拟主机)Exchange(交换机)默认交换机直连交换机扇型交换机主题交换机头交换机交换机小结Binding(绑定)RoutingKey(路由键)Queue(队列)Consum 查看详情

rabbitmq-amqp模型详解二(代码片段)

...P得流程,以及介绍VhostHost、连接 、通道、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布事务机制,发布确认机制,消费者独占等 查看详情

rabbitmq-amqp模型详解(代码片段)

...篇文章会解析RabbitMQ的核心概念,解析包括connection、exchange、queue、中间代理等来解析amqp模型。概念RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,... 查看详情

exchange安全和维护(代码片段)

...个默认角色组中具备管理权限自定义角色组用户角色策略Exchange审核执行特殊&权限过大的操作时,默认Exchange会记录该操作动作,也可以自定义为指定用户邮箱审核Exchange操作邮箱审核:针对敏感用户邮箱Exchange变更定义变更... 查看详情

(九)rabbitmq交换机(exchange)(代码片段)

交换机Exchange1、交换机1.1.Exchanges概念1.2.Exchanges的类型1.3.无名exchange(默认交换机)2、临时队列3、绑定(bindings)4、Fanout(发布/订阅)5、Directexchange、6、Topics在这里插入图片描述1、交换机1.1.Exchanges概念R 查看详情

exchanges交换机(代码片段)

文章目录Exchanges交换机概念交换机类型无名exchange交换机绑定(bindings)临时队列Fanout扇出类型交换机(也叫作发布订阅模式)概念两个接收者代码发送者代码测试Direct直接交换机类型概念两个接收者代码发送者代码测试Exchanges交换机... 查看详情

从零开始-exchange和skypeforbusiness部署-exchange安装(代码片段)

软件准备:Exchange服务器必须安装NETFramework4.7.1https://www.microsoft.com/zh-CN/download/details.aspx?id=56116VisualC++RedistributablePackagesforVisualStudio2013https://www.microsoft.com/zh-CN/download/details.a 查看详情

exchange2013和exchange2019共存部署和配置(代码片段)

背景Exchange2013生命周期将在2023年4月份终结。升级已经势在必行。而升级过程中会存在共存阶段。本篇尽量减少文章篇幅,介绍关键部分和遇到的疑问和问题该如何解决。共存方案Exchange2013需要升级到CU21之后方可和Ex2019共存,从... 查看详情

java并发(十六):并发工具类——exchanger(代码片段)

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法... 查看详情

exchange2013cu20和exchange2016cu9补丁已经发布(代码片段)

微软已经在2018年3月20日发布了2018年Exchange2013/2016补丁更新。关于微软的补丁发行说明可以参考:https://blogs.technet.microsoft.com/exchange/2018/03/20/released-march-2018-quarterly-exchange-updates/具体的最新补丁下载地址:Exchange2016CU9下载地址:http 查看详情

exchange体系结构的变化(代码片段)

微软从Exchange2007开始,为了提高服务器的安全性及稳定性,开始将服务器分为五个角色来部署。到了Exchange2010,依然使用这种模式来进行部署,下图说明了一个已部署Exchange2010所有服务器角色的域:Exchange2010包括以下服务器角色... 查看详情

springboot2.x集成rabbitmq详解topic模式(代码片段)

Topic模式重点是理解交换器(exchange)、路由键(routingkey)、队列名(queuename)三者之间的绑定关系。topic发送方:发送方关注参数主要有三个交换器(exchange)路由键(routingkey)和消息topic消费方消费方关注点是队列的名字定义... 查看详情

exchange2010刷新已断开连接的邮箱(代码片段)

Clean-MailboxDatabase(Exchange2010)是Exchange2013中的Update-StoreMailboxState刷新Exchange2010数据库中已断开连接的邮箱-Clean-Mailbox数据库针对特定数据库-Get-MailboxDatabase“Dbname”|Clean-mailboxdatabase对于所有数据库-Get-MailboxDatabas 查看详情

导出exchange2007/2010设置(代码片段)

从ExchangeServer2007&2010迁移到Exchange2013时,最好保留旧服务器的设置。可以使用以下脚本将设置导出到文本:mdC:ExchangeSettings$Commands=gcmget*virtual*$Commands+="Get-ReceiveConnector","Get-SendConnector","Get-ClientAccessServer"," 查看详情

exchange2016自定义messagetracking(代码片段)

Exchange默认日志流保存45天,根据实际需求,可更改下面命令是将,日志单文件设定为20MB、文件夹50G、保存天数365天Set-TransportServicexxxxx-MessageTrackingLogPath"D:ProgramFilesMicroSoftExchangeServerV15TransportRolesLogsMessageTracking"-Messa 查看详情