rabbitmq学习(下)——发布确认高级幂等性优先级惰性和rabbitmq集群(代码片段)

AC_Jobim AC_Jobim     2022-12-19     366

关键词:

一、发布确认高级

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式
  • return 退回模式

rabbitmq 整个消息投递的路径为producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个confirmCallback
  • 消息从 exchange–>queue 投递失败则会返回一个returnCallback

我们将利用这两个 callback 控制消息的可靠性投递


消息的可靠投递小结:

  • 通过spring.rabbitmq.publisher-confirm-type属性设置发布确认的类型

    • NONE 值是禁用发布确认模式,是默认值
    • CORRELATED 值是发布消息成功到交换器后会触发回调方法
    • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

    使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  • 设置ConnectionFactory的publisher-returns=“true” 开启退回模式

    使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

1.1 confirm 确认模式

代码架构图:

  1. 配置文件,设置spring.rabbitmq.publisher-confirm-type=correlated

    spring.rabbitmq.host=192.168.2.4
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    # 发布消息成功到交换器后会触发回调方法
    spring.rabbitmq.publisher-confirm-type=correlated
    
  2. 添加配置类

    /**
     * 配置类,发布确认(高级)
     */
    @Configuration
    public class ConfirmConfig 
    
        //交换机
        public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
        //RoutingKey
        public static final String CONFIRM_ROUTING_KEY = "key1";
    
        //声明交换机
        @Bean
        public DirectExchange confirmExchange() 
            return new DirectExchange(CONFIRM_EXCHANGE_NAME);
        
    
        //声明队列
        @Bean
        public Queue confirmQueue() 
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        
    
        //绑定
        @Bean
        public Binding queueBindingExchange(
                @Qualifier("confirmQueue") Queue confirmQueue,
                @Qualifier("confirmExchange") DirectExchange confirmExchange) 
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        
    
    
    
  3. 消息生产者

    /**
     * 开始发消息,测试确认
     */
    @Slf4j
    @RestController
    @RequestMapping("/confirm")
    public class ProducerController 
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        //发消息,http://localhost:8080/confirm/sendMessage/你好
        @RequestMapping("/sendMessage/message")
        public void sendMessage(@PathVariable("message") String message) 
            CorrelationData correlationData = new CorrelationData("1");
            String routingKey1 = "key1";
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    routingKey1,message,correlationData);
            log.info("发送消息内容:,routingKey:", message,routingKey1);
    
            CorrelationData correlationData2 = new CorrelationData("2");
            String routingKey2 = "key2";
            rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                    routingKey2,message,correlationData2);
            log.info("发送消息内容:,routingKey:", message,routingKey2);
        
    
    
  4. 消息生产者的回调接口(重点)

    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback 
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() 
            //给rabbitTemplate对象注入ConfirmCallback回调对象
            rabbitTemplate.setConfirmCallback(this);
        
    
        /**
         * 交换机不管是否收到消息的一个回调方法
         * 1.发消息 交换机收到了
         *  1.1 correlationData 保存回调消息的ID及相关信息,注意该参数默认为null,其值需要发送方发送消息的时候进行设置
         *  1.2 交换机收到消息 true
         *  1.3 cause:null
         * 2.发消息,交换机接收失败了
         *  2.1 correlationData 保存回调消息的ID及相关信息
         *  2.2 交换机未收到消息 false
         *  2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) 
            String id = correlationData != null ? correlationData.getId() : "";
            if(ack)
                log.info("交换机已经收到 id 为:的消息",id);
            else
                log.info("交换机还未收到 id 为:消息,由于原因:",id,cause);
            
        
    
    
  5. 执行测试

    访问:http://localhost:8080/confirm/sendMessage/你好

    可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key1”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了

    丢弃的消息交换机是不知道的,需要解决告诉生产者消息传送失败


    将生产者的代码改成下面的样子:(设置发送到不存在的交换机)

    //发消息,http://localhost:8080/confirm/sendMessage/你好
    @RequestMapping("/sendMessage/message")
    public void sendMessage(@PathVariable("message") String message) 
        CorrelationData correlationData = new CorrelationData("1");
        String routingKey1 = "key1";
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"1",
                routingKey1,message,correlationData);
        log.info("发送消息内容:,routingKey:", message,routingKey1);
    
    

    访问:http://localhost:8080/confirm/sendMessage/你好

    可以看到当发送到错误的交换机时回调方法还是执行的。

1.2 return 退回模式

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

所以需要通过设置ConnectionFactory的publisher-returns=“true” 开启退回模式,就可以在当消息传递过程中不可达目的地时将消息返回给生产者。

  1. 配置文件,设置spring.rabbitmq.publisher-returns=true

    spring.rabbitmq.host=192.168.2.4
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=123
    # 发布消息成功到交换器后会触发回调方法
    spring.rabbitmq.publisher-confirm-type=correlated
    # 开启时,当消息发送不出去的时候会回退消息
    spring.rabbitmq.publisher-returns=true
    
  2. 修改回调接口

    @Slf4j
    @Component
    public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback 
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        
    	//依赖注入 rabbitTemplate 之后再设置它的回调对象
        @PostConstruct
        public void init() 
            //给rabbitTemplate对象注入ConfirmCallback回调对象
            rabbitTemplate.setConfirmCallback(this);
            //给rabbitTemplate对象注入ReturnsCallback回调对象
            rabbitTemplate.setReturnsCallback(this);
        
    
        /**
         * 交换机不管是否收到消息的一个回调方法
         * 1.发消息 交换机收到了
         *  1.1 correlationData 保存回调消息的ID及相关信息,注意该参数默认为null,其值需要发送方发送消息的时候进行设置
         *  1.2 交换机收到消息 true
         *  1.3 cause:null
         * 2.发消息,交换机接收失败了
         *  2.1 correlationData 保存回调消息的ID及相关信息
         *  2.2 交换机未收到消息 false
         *  2.3 cause:失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) 
            String id = correlationData != null ? correlationData.getId() : "";
            if(ack)
                log.info("交换机已经收到 id 为:的消息",id);
            else
                log.info("交换机还未收到 id 为:消息,由于原因:",id,cause);
            
        
    
        //可以在当消息传递过程中不可达目的地时将消息返回给生产者
        //只有不可达目的地的时候,才进行回退
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) 
            log.info("消息:被服务器退回,退回原因:, 交换机是:, 路由 key:",
                    new String(returnedMessage.getMessage().getBody()),returnedMessage.getReplyText(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey());
        
    
    
  3. 测试执行

    访问:http://localhost:8080/confirm/sendMessage/你好

1.3 备份交换机

  • 设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

  • 在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

  • 备份交换器是为了实现没有路由到队列的消息,声明交换机的时候添加属性alternate-exchange,声明一个备用交换机,为了方便使用一般声明为fanout类型,这样交换机收到路由不到队列的消息就会发送到备用交换机,进而发送到绑定的备份队列中。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

代码架构图:

  1. 在上面代码的基础上,修改配置类

    @Configuration
    public class ConfirmConfig 
    
        //交换机
        public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
        //队列
        public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
        //RoutingKey
        public static final String CONFIRM_ROUTING_KEY = "key1";
        //备份交换机
        public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
        //备份队列
        public static final String BACKUP_QUEUE_NAME = "backup_queue";
        //报警队列
        public static final String WARNING_QUEUE_NAME = "warning_queue";
    
        //声明交换机,并设置该交换机的备份交换机
        @Bean
        public DirectExchange confirmExchange() 
            return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                    .durable(true)
                    .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME)//设置该交换机的备份交换机
                    .build();
        
    
        //声明队列
        @Bean
        public Queue confirmQueue() 
            return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
        
    
        //绑定
        @Bean
        public Binding queueBindingExchange(
                @Qualifier("confirmQueue") Queue confirmQueue,
                @Qualifier("confirmExchange") DirectExchange confirmExchange) 
            return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
        
    
    
        //备份交换机
        @Bean
        public FanoutExchange backupExchange() 
            return new FanoutExchange(BACKUP_EXCHANGE_NAME);
        
    
        //备份队列
        @Bean
        public Queue backupQueue() 
            return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
        
    
        //报警队列
        @Bean
        public Queue warningQueue() 
            return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
        
    
        //绑定,备份队列和备份交换机绑定
        @Bean
        public Binding backupQueueBindingBackupExchange(
                @Qualifier("backupQueue") Queue backupQueue,
                @Qualifier("backupExchange") FanoutExchange backupExchange) 
            return BindingBuilder.bind(backupQueue).to(backupExchange);
        
    
        //绑定,报警队列和备份交换机绑定
        @Bean
        public Binding warningQueueBindingBackupExchange(
                @Qualifier("warningQueue") Queue warningQueue,
                @Qualifier("backupExchange") FanoutExchange backupExchange) 
            return BindingBuilder.bind(warningQueue).to(backupExchange);
        
    
    
  2. 报警消费者

    /**
     * 报警消费者
     */
    @Slf4j
    @Component
    public class WarningConsumer 
    
        @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
        public void receiveWarningMsg(Message message) 
            String msg = new String(message.getBody());
            log.error("报警发现不可路由消息:", msg);
        
    
    
  3. 执行测试

访问: http://localhost:8080/confirm/sendMessage/你好,可以看到没有被路由到队列的消息被报警消费者消费了。

上面的代码mandatory 参数与备份交换机同时开启了。但最后的结果是备份交换机发现了不可路由的消息,而回退方法没有被调用。
即:备份交换机优先级高

二、幂等性、优先级、惰性

2.1 幂等性

  • 幂等性的实质是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响,而不是请求资源返回的结果。就是保证同一条消息不会重复或者重复消费了也不会对系统数据造成异常。

RabbitMQ的幂等性

  • 拿RabbitMQ来说的话,消费者在消费完成一条消息之后会向MQ回复一个ACK(可以配置自动ACK或者手动ACK) 来告诉MQ这条消息已经消费了。假如当消费者消费完数据后,准备回执ACK时,系统挂掉了,MQ是不知道该条消息已经被消费了。所以重启之后MQ会再次发送该条消息,导致消息被重复消费,如果此时没有做幂等性处理,可能就会导致数据错误等问题。

如何避免消息的重复消费问题?

全局唯一ID + Redis

  • 生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId,1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃。

  • setnx命令,若给定的key不存在,执行set操作,返回1,若给定的Key已存在,不做任何操作,返回0。

生产者代码:

public void sendMessageIde() 
    MessageProperties properties = new MessageProperties();
    properties.setMessageId(UUID.randomUUID().toString());
    Message message = new Message("Hello RabbitMQ".getBytes(), properties);
    rabbitTemplate.convertAndSend("durable-exchange", "rabbit.long.yuan", message);

消费者代码:

@RabbitListener(queues = "durable-queue")
@RabbitHandler
public void processIde(Message message, Channel channel) throws IOException 
 
    if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1"))
        // 业务操作...
        System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
 
        // 手动确认
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

2.2 优先级

  • 在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。

  • 对于上面的情况,就需要使用到优先级队列,即具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

注意:实现优先级队列,队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费。

设置优先级队列:

  1. 通过RabbitMQ管理界面配置队列的优先级属性

  2. 通过代码去实现

    Map<String,Object> args = new HashMap<String,Object>();
    args.put("x-max-priority", 10);
    channel.queueDeclare("queue_priority", true, false, false, args);
    

配置了队列优先级的属性之后,可以在管理页面看到Pri的标记:

发送的消息中设置消息本身的优先级:

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

实战:发送10条消息,其中第5条为高优先级消息

  1. 生产者
public class PriorityProducer 
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        
        Map<String, Object> arguments = new HashMap<>();
        //官方允许0-255之间,此处设置10,表示允许优先级范围为0-10,不要设置过大,浪费CPU和内存
        arguments.put("x-max-priority",10);
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);

        for (int i = 1;i < 11 ;i++) 
            String message = "info" + i;
            if(i == 5) 
                //发送优先级为5的消息,数字越高,优先级越高
                AMQP.BasicProperties properties 查看详情  

rabbitmq--高级特性(代码片段)

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下一、Confirm机制  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否... 查看详情

rabbitmq---幂等性,优先级队列,惰性队列(代码片段)

RabbitMQ幂等性概念解决思路优先级队列使用场景如何给队列添加优先级惰性队列使用场景两种模式幂等性概念用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子... 查看详情

rabbitmq消费者保证幂等性(代码片段)

...oot-starter-amqp</artifactId></dependency>application.yml配置rabbitmq连接spring:rabbitmq:host:127.0.0.1port:5672username:guestpassword:guestvirtual-host:/guoVirtualHostlistener:simple:retry:#开启消费者(出现异常)会进行重试enabled:true#默认重试无限次&... 查看详情

第201天学习打卡(幂等性)

接口幂等性要保证原子性提交订单需要幂等性B站学习网址:全网最强电商教程《谷粒商城》对标阿里P6/P7,40-60万年薪_哔哩哔哩_bilibili 查看详情

rabbitmq学习--发布确认高级(代码片段)

背景在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特... 查看详情

rabbitmq学习--发布确认高级(代码片段)

背景在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特... 查看详情

分布式-幂等性解决方案

...系。常见幂等性问题重复下单在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问 查看详情

phprabbitmq的开发体验(代码片段)

一、前言在上一篇rabbitmq开发体验(二),我们正式的用我们php来操作消息队列的生产和消费,并利用的rabbitmq的高级特性来进行ack确认机制,幂等性,限流机制,重回机制,ttl,死信队列(相当于失败消息的回收站)。已经可以正... 查看详情

简单理解幂等性

一.什么是幂等性  幂等性:客户端以相同的方式重复调用服务就应该产生相同的结果。二.为什么需要幂等性  例如:下订单的时候,点击支付之后没反应,就多点击了几次。在这种情况下,如果接口不是幂等性,那么就会... 查看详情

rabbitmq学习笔记(代码片段)

目录一、引⾔二、RabbitMQ介绍三、RabbitMQ安装四、RabbitMQ架构1.官⽅的简单架构图2.RabbitMQ的完整架构图3.查看图形化界⾯并创建⼀个VirtualHost五、RabbitMQ的队列模式1.RabbitMQ的通讯⽅式2.HelloWorld模式-简单队列模式3.work队列模式:... 查看详情

rabbitmq系列消息幂等性处理(代码片段)

一、springboot整合rabbitmq我们需要新建两个工程,一个作为生产者,另一个作为消费者。在pom.xml中添加amqp依赖:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>... 查看详情

接口的幂等性(代码片段)

最近跟朋友聊起这个话题,想深入了解下,于是学习总结,记录下来,此文章参考以下博客综合而来表示感谢:http://blog.brucefeng.info/post/api-idempotenthttp://825635381.iteye.com/blog/2276077https://www.cnblogs.com/leechenxiang/p/6626629.html1.接口调用存在的问... 查看详情

编程思想之幂等性一编程之道

...的。今天看到还有这样一篇稿文,那就整理下分享给大家学习!编程思想之幂等性  什么是幂等性  既然幂等性源于数学,那我就使用数学公式来表示,即可一目了然!  f(f(x))=f(x)  显然,从上面的二元函数可以看出,无... 查看详情

消息队列rabbitmq原理消息队列保证幂等性,消息丢失,消息顺序性,以及处理消息队列消息积压问题

...ff0c;只不过队列中存放的内容是message而已常见的消息队列RabbitMqActiveMqZeroMqkafka等;为什么使用RabbitMq?RabbitMQ是一个实现了AMQP&#x 查看详情

rabbitmq:第二章:spring整合rabbitmq(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,ttl,死信队列,延迟队列,消息积压,消息幂等性)(代码(代码片段)

系列文章目录RabbitMQ:第一章:6种工作模式以及消息确认机制(理论与代码相结合)RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递... 查看详情

rabbitmq高级进阶(2021.05.29)(代码片段)

前篇回顾:RabbitMQ入门学习前篇回顾:springboor整合RabbitMQ1.RabbitMQ高级特性1.1消息的可靠投递在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息... 查看详情

分布式服务的幂等性设计(代码片段)

...提供查询订单状态的接口,上游在下单之前先进行查询,确认该笔订单并没有成功支付后,再重复进行下单操作。一般来说,服务本身需要自己保证幂等性,而不应该将幂等性交给上游的调用方来做。唯一ID就上面的幂等性下单... 查看详情