rabbitmq--高级特性(代码片段)

huigelaile huigelaile     2022-12-13     131

关键词:

在上一篇文章讲解MQ消息可靠性投递和幂等性中有提到confirm机制的重要性,现在更相信的说明一下

一、Confirm机制

  Confirm就是消息确认,当Producer发送消息,如果Broker收到消息,会回复一个应答,我们可以以此来确认消息是否成功送达,是保证

消息可靠性投递的核心保障

Producer代码如下,只需要修改Producer端,而Consumer端不需要修改

//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();

//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() 
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException 
        System.err.println("-------no ack!-----------");
    

    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException 
        System.err.println("-------ack!-----------");
    
);

结果:

-------ack!-----------

只要Producer能把消息发送给Broker,就会返回handlerAck中,返回到NAck的可能很小,例如MQ出现异常,queue的容量达到上限

二、Return消息机制

Return Listener用于处理一些不可路由的消息

Producer:

技术图片
public class Producer 

    public static void main(String[] args) throws Exception 
        //1 创建ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);

        //2 获取Connection
        Connection connection = factory.newConnection();

        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();

        String exchangeName = "exchange_topic";
        String routingKey = "fdasfdsafsadf4543453";

        //6 添加一个return监听
        channel.addReturnListener(new ReturnListener() 
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            
        );
        //5 发送一条消息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());

    
Producer Return

结果:

---------handle  return----------
replyCode: 312
replyText: NO_ROUTE
exchange: exchange_topic
routingKey: fdasfdsafsadf4543453
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ Send confirm message!

注意:

  channel.basicPublish参数里面一定要把Mandatory设置为true,才能收到监听不可达的消息(创建exchange、routingKey不匹配等问题

,导致不可达),然后进行后续处理,如果为false,broker自动删除该消息,上面例子就是routingKey设置不匹配,Consumer的代码就不给了

三、消息端限流

限流一般无法从生产端,只能在消费端处理

在Consumer端设置:

channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));

qos:

  服务质量保证,在非自动确认情况下,一定数目的消息没有确认,不进行消费新的消息,通过producer/consumer设置qos的值

channel.basicQos(prefetchSize, prefetch_count, global);

注意:

  prefetchSize和global,rabbitMQ没有实现,默认0表示对单条message的大小没有限制、false(非channel级别,consumer级别)

  channel.basicConsume中自动签收一定要设置成false

  prefetch_count表示一次给几条进行消费,直到返回ack,才能继续给prefetch_count条message

在MyConsumer中手动签收

public class MyConsumer extends DefaultConsumer 
    private Channel channel;
    public MyConsumer(Channel channel) 
        super(channel);
        this.channel = channel;
    

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        channel.basicAck(envelope.getDeliveryTag(), false);
    

四、TTL

五、死信队列

未完待续。。。

 

rabbitmq高级特性之死信队列通俗易懂超详细内含案例(代码片段)

RabbitMq高级特性之死信队列又称死信交换机DLX当消息成为Deadmessage后,会重新发送到另一个交换机,这个交换机就是DLX消息成为死信的情况公有三种:队列消息长度达到限制消费者拒接消费消息basicNack/basicReject,并且不把消息重新放回... 查看详情

rabbitmq高级特性之消费端限流通俗易懂超详细内含案例(代码片段)

RabbitMq高级特性之消费端限流一丶首先部署SpringBoot框架完成SpringBoot整合RabbitMq中的Topic通配符模式二丶在resource资源文件夹里application.yml文件中添加配置spring:rabbitmq:listener:simple:acknowledge-mode:manual#开启手动签收prefetch:3#一次就收三... 查看详情

rabbitmq高级特性之ttl存活时间/过期时间通俗易懂超详细内含案例(代码片段)

RabbitMq高级特性之TTL存活时间/过期时间每条消息设置过期时间整个Queue队列设置过期时间前提完成RabbitMq高级特性之消费端限流一、每条消息设置过期时间1.更改ProducerTest.java文件importlombok.extern.slf4j.Slf4j;importorg.junit.Test;importorg.juni... 查看详情

springboot整合rabbitmq高级特性&真实业务应用(代码片段)

♨️本篇文章记录的为RabbitMQ知识中高级特性和企业级项目相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️... 查看详情

消息队列专题(高级特性篇):rabbitmq如何保证消息的可靠性投递传输和消费(代码片段)

我们使用RabbitMQ进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投... 查看详情

rabbitmq高级进阶(2021.05.29)(代码片段)

前篇回顾:RabbitMQ入门学习前篇回顾:springboor整合RabbitMQ1.RabbitMQ高级特性1.1消息的可靠投递在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息... 查看详情

rabbitmq高级特性之延迟队列通俗易懂超详细内含案例

RabbitMq高级特性之延迟队列消息进入队列后不能立即被消费,到达指定时间后才可被消费实现结合以下两种即可达到延迟队列RabbitMq高级特性之TTL过期时间RabbitMq高级特性之DLX死信队列延迟队列小结延迟队列指消息进入队列后,经过... 查看详情

消息队列专题(高级特性篇):rabbitmq如何保证消息的可靠性投递传输和消费(代码片段)

我们使用RabbitMQ进行消息处理一般都需要保证消息的可靠性,而消息的可靠性又可以根据消息的不同处理阶段分为可靠性投递、传输和消费。本篇博客将针对这三种情况介绍相应的设计方案,首先来看一下消息的可靠性投... 查看详情

rabbitmq学习(下)——发布确认高级幂等性优先级惰性和rabbitmq集群(代码片段)

RabbitMQ学习(下)——发布确认高级、幂等性、优先级、惰性和RabbitMQ集群一、发布确认高级1.1confirm确认模式1.2return退回模式1.3备份交换机二、幂等性、优先级、惰性2.1幂等性2.2优先级2.3惰性三、RabbitMQ集群3.1RabbitMQ集群... 查看详情

rabbitmq学习--发布确认高级(代码片段)

背景在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特... 查看详情

rabbitmq学习--发布确认高级(代码片段)

背景在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特... 查看详情

rabbitmq:发布确认高级(代码片段)

✨RabbitMQ:发布确认高级1.发布确认1.1发布确认机制方案1.2全局配置文件1.3配置类1.4生产者1.5消费者1.6测试1.7回调接口1.8改写生产者代码1.9测试2.回退消息2.1Mandatory参数2.2在全局配置文件中开启回退消息2.3生产者2.4回调接口2.5... 查看详情

springboot高级rabbitmq(代码片段)

一、概述二、Exchange(交换器)类型目前有四种:direct、fanout、topic、headers(1)direct路由键(routingkey)与队列名(name)完全一致(2)fanout交换器下所有队列都接收到消息( 查看详情

09-rabbitmq-发布确认高级-springboot版本(代码片段)

在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是... 查看详情

rabbitmq——rabbitmq的高级特性(ttl死信队列延迟队列优先级队列rpc)

摘要本博文将介绍数据可靠性的一些细节,并展示RabbitMQ的几种已具备或衍生的高级特性,包括TTL、死信队列、延迟队列、优先级队列、RPC等,这些功能在实际使用中可以让某些应用的实现变得事半功倍。同时对源码进行详细的... 查看详情

rabbitmq研究高级使用(代码片段)

消息的流转mandatory参数之前的博客已经讲到了,当mandatory=true,交换器无法根据自身的类型和路由键找到一个符合条件的队列,会将消息返回给生产者。当mandatory=false,直接丢弃。生产者想要获取到没有合适... 查看详情

rabbitmq----消息可靠性传递(代码片段)

RabbitMQ的高级特性------消息可靠性传递  在开始之前,先想想几个问题,我们采用了生产者(channel)---->交换机------>队列中,那么如果其中生产者在向交换机发送信息是能一定发送成功吗?,该如何确认信... 查看详情

centos安装rabbitmq(代码片段)

RabbitMQ是流行的开源消息队列系统,是AMQP(AdvancedMessageQueuingProtocol高级消息队列协议)的标准实现,用erlang语言开发。RabbitMQ据说具有良好的性能和时效性,同时还能够非常好的支持集群和负载部署,非常适合在较大规模的分布... 查看详情