消息队列rabbitmq高级特性(代码片段)

baidawei baidawei     2022-12-08     535

关键词:

一.消息的可靠投递

在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性

rabbitMQ 整个消息投递过程为:

 producer ->  rabbitMQ broker -> exchange -> queue ->consumer

  1.confirm 确认模式

    消息从producer 到exchange 会返回一个 confirmCallback

  2.return 退回模式

    消息从exchange到queue投递失败则会返回一个returnCallbak

Confirm确认模式:

1.在配置文件中开启确认模式 publisher-confimrs :true

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-confirms: true

2.在rabbitTemplate定义ConfirmCallBack回调函数

@RequestMapping("/producer")
    public void producer()
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() 
            @Override
            /**
             * ack exchange交换机是否收到了消息
             * cause 失败原因
             */
            public void confirm(CorrelationData correlationData, boolean ack, String cause) 
                if(ack)
                    System.out.println("confirm方法 成功了");
                else
                    System.out.println("confirm方法 失败了: " + cause);
                
            
        );

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq");
    

 

Return退回模式:

1.开启退回模式

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true

2.设置returnCallBack

@RequestMapping("/return")
    public void returncallback()
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        //设置return call back
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() 
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 
                System.out.println("return 执行了....");

                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);

                //处理
            
        );

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back");
    

 

二、Consumer ACK 确认机制

表示消费端收到消息后到确认方式

有三种确认方式:

1.自动确认 acknowledge="none" 默认

2.手动确认 acknowledge="manual"

3.根据异常情况确认: acknowledge="auto"

1.开启手动确认

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual

2.成功调用channel.basicAck   异常调用channel.basicNack

@Component
public class RabbitMQListener implements ChannelAwareBatchMessageListener 

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception 
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
            System.out.println(new String(message.getBody()));
            int a = 1/0;
            //手动确认
            channel.basicAck(deliveryTag,true);
        catch (Exception ex)
            //发生异常 拒绝确认
            //第三个参数,重回队列
            channel.basicNack(deliveryTag,true,true);
        
    

 

三、消费端 限流

技术图片

1.确保ack机制为手动确认

2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条

server:
  port: 9999
spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        prefetch: 1

 

四、TTL

Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。

1.queue 队列设置过期时间

@Configuration
public class RabbitMQConfig 
    public static final String EXCHANGE_NAME = "exchange_ttl";
    public static final String QUEUE_NAME = "queue_ttl";
    public static final String Routing_Key = "ttl.#";

    // 1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange() 
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    


    //2.Queue队列
    @Bean("bootQueue")
    public Queue bootQueue() 
        return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
    

    //3.绑定交换机和队列
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) 
        return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
    

2.消息设置过期时间

@RequestMapping("/producer")
    public void producer()

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() 
            @Override
            public Message postProcessMessage(Message message) throws AmqpException 
                message.getMessageProperties().setExpiration("5000"); // 消息过期时间
                return message;
            
        ;

        for(int i = 0;i<10;i++)
            if(i>5)
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor);
            else
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i);
            
        
    

 

五、死信队列

DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。

技术图片

消息成为死信到三种情况:

1.队列消息长度达到限制,

2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。

3.原队列存在消息过期设置,消息到达超时时间未被消费

 

设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key

 

消息队列专题(高级特性篇):rabbitmq如何保证消息的可靠性投递传输和消费(代码片段)

我们使用RabbitMQ进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投... 查看详情

消息队列专题(高级特性篇):rabbitmq如何保证消息的可靠性投递传输和消费(代码片段)

我们使用RabbitMQ进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投... 查看详情

rabbitmq高级进阶(2021.05.29)(代码片段)

前篇回顾:RabbitMQ入门学习前篇回顾:springboor整合RabbitMQ1.RabbitMQ高级特性1.1消息的可靠投递在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息... 查看详情

rabbitmq——rabbitmq的高级特性(ttl死信队列延迟队列优先级队列rpc)

摘要本博文将介绍数据可靠性的一些细节,并展示RabbitMQ的几种已具备或衍生的高级特性,包括TTL、死信队列、延迟队列、优先级队列、RPC等,这些功能在实际使用中可以让某些应用的实现变得事半功倍。同时对源码进行详细的... 查看详情

rabbitmq高级特性之延迟队列通俗易懂超详细内含案例

RabbitMq高级特性之延迟队列消息进入队列后不能立即被消费,到达指定时间后才可被消费实现结合以下两种即可达到延迟队列RabbitMq高级特性之TTL过期时间RabbitMq高级特性之DLX死信队列延迟队列小结延迟队列指消息进入队列后,经过... 查看详情

rabbitmq--高级特性(代码片段)

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下一、Confirm机制  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否... 查看详情

rabbitmq高级特性之ttl存活时间/过期时间通俗易懂超详细内含案例(代码片段)

RabbitMq高级特性之TTL存活时间/过期时间每条消息设置过期时间整个Queue队列设置过期时间前提完成RabbitMq高级特性之消费端限流一、每条消息设置过期时间1.更改ProducerTest.java文件importlombok.extern.slf4j.Slf4j;importorg.junit.Test;importorg.juni... 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq消息队列笔记(代码片段)

...发布确认高级在生产环境中由于一些不明原因,导致RabbitMQ 重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行RabbitMQ的消息可靠投递呢?特别是在比较极端 查看详情

rabbitmq✧消息队列(代码片段)

☣RabbitMQ✧消息队列☢什么是RabbitMQ?RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架... 查看详情

rabbitmq消息队列集群(代码片段)

RabbitMQMQ(MessageQueue,消息队列)是一款消息中间件,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息异步。集群原理rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitm... 查看详情

rabbitmq消息队列集群配置(代码片段)

RabbitMQ是什么?MQ(MessageQueue,消息队列)消息中间件,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息同步。原理介绍rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbit... 查看详情

rabbitmq消息队列集群配置-1(代码片段)

RabbitMQ是什么?MQ(MessageQueue,消息队列)消息中间件,一般以集群方式部署,主要提供消息的接受和发送,实现各微服务之间的消息同步。原理介绍rabbitmq是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbit... 查看详情

rabbitmq---延迟队列,整合springboot(代码片段)

RabbitMQ---消息队列---下半部分延迟队列概念使用场景RabbitMQ中的TTL队列设置TTL消息设置TTL两者的区别整合springbootpom文件配置文件添加Swagger配置类队列TTL代码架构代码实现延时队列优化代码架构图实现Rabbitmq插件实现延迟队列安装... 查看详情

rabbitmq之发布确认高级回退消息备份交换机幂等性优先级队列惰性队列(代码片段)

...介        在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特... 查看详情

rabbitmqamqp(高级消息队列协议)

目录RabbitMQAMQP(高级消息队列协议)MessageQueue简介概念基本组成场景及作用AMQP简介模型架构基础组件AMQP-RabbitMQ简介模型特性参考RabbitMQAMQP(高级消息队列协议)AMQP协议是MessageQueue消息队列的一种协议,RabbitMQ是基于AMQP协议实现的一... 查看详情

rabbitmq集群配置(代码片段)

RabbitMQ简介MQ(MessageQueue消息队列)是一种应用程序对应用程序的通信方法。引用程序通过读写出入队列的消息来通信而无需专用连接来连接他们。通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的... 查看详情

初闻rabbitmq(代码片段)

RabbitMQMQ全称为MessageQueue,即消息队列,RabbitMQ是由erlang语言开发,基于AMQP(AdvancedMessageQueue高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通讯方式,消息队列在分布式系统应用非常广泛。应用场... 查看详情