springboot下rabbitmq的实战应用:动态创建和动态监控队列死信备份交换机(代码片段)

土味儿~ 土味儿~     2023-01-20     173

关键词:

一、应用场景

  • 业务中心根据业务需求向特定用户发送消息;发送前不确定由哪个用户接收

  • 特定用户接收特定消息;用户可以退出,再切换别的用户登录,用户登录后只接收与自已对应的消息

二、总体要求

项目要足够稳健,消息不能丢失

  • 交换机、队列、消息持久化

  • 队列有容量限制;如:3000

  • 消息发送后需要确认(非自动确认)

  • 未发送成功的消息,由缓存保存,定时重发

  • 交换机收到消息,但无法投递时,转发至备份交换机,再广播至对应队列

  • 费时操作采用异步方式

三、架构图

四、安装RabbitMQ

参考如下三篇文章

五、搭建SpringBoot项目

  • java1.8

  • spring-boot 2.6.7

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.7</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tuwer</groupId>
    <artifactId>mq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- amqp-client -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- amqp-client Java原生依赖 -->
<!--        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>-->
        <!-- hutool-all -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.2</version>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>2.13.3</version>
        </dependency>
        <dependency>
            <groupId>jakarta.json</groupId>
            <artifactId>jakarta.json-api</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.3</version>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- 工具类 -->
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

2、application.yml

spring:
  rabbitmq:
    host: 192.168.3.174
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 交换机接收确认
    publisher-confirm-type: correlated
    # 交换机回退消息
    #publisher-returns: true

2、启动类

@EnableAsync 开启异步操作

package com.tuwer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
@EnableAsync
@SpringBootApplication
public class MqApp 
    public static void main(String[] args) 
        SpringApplication.run(MqApp.class, args);
    

3、基础类

3.1、常量类

package com.tuwer.constant;

/**
 * <p>系统常量类</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */
public class Constants 
    /**
     * 队列容量、通道预取值
     * 队列容量应根据项目需要,设置合适的值;
     * 本案例中为了测试方便设为5
     */
    public static final int QUEUE_CAPACITY = 5;
    public static final int PRE_FETCH_SIZE = 10;

    /**
     * 交换机
     */
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String BACKUP_EXCHANGE = "backup_exchange";

    /**
     * 队列
     */
    public static final String BACKUP_QUEUE = "backup_queue";

3.2、雪花算法工具类

获取Long型id:SnowflakeUtil.getInstance().nextId()

package com.tuwer.util;

import lombok.extern.slf4j.Slf4j;

import java.text.MessageFormat;

/**
 * <p>雪花算法工具类</p>
 *
 * @author 土味儿
 * Date 2022/6/2
 * @version 1.0
 */
@Slf4j
@SuppressWarnings("all")
public class SnowflakeUtil 
    // ==============================Fields===========================================
    /**
     * 开始时间戳 (2000-01-01 00:00:00)
     */
    private static final long TWEPOCH = 946656000000L;

    /**
     * 机器id所占的位数 5
     */
    private static final long WORKER_ID_BITS = 5L;

    /**
     * 数据标识id所占的位数 5
     */
    private static final long DATA_CENTER_ID_BITS = 5L;

    /**
     * 支持的最大机器id,结果是 31
     */
    private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);

    /**
     * 支持的最大数据标识id,结果是 31
     */
    private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);

    /**
     * 序列在id中占的位数
     */
    private static final long SEQUENCE_BITS = 12L;

    /**
     * 机器ID向左移12位
     */
    private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;

    /**
     * 数据标识id向左移17位(12+5)
     */
    private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;

    /**
     * 时间戳向左移22位(5+5+12)
     */
    private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

    /**
     * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     */
    private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);

    /**
     * 步长 1024
     */
    private static final long STEP_SIZE = 1024;

    /**
     * unsigned int max value
     */
    private static final long UINT_MAX_VALUE = 0xffffffffL;

    /**
     * 工作机器ID(0~31)
     */
    private long workerId;

    /**
     * 工作机器ID 计数器
     */
    private long workerIdFlags = 0L;

    /**
     * 数据中心ID(0~31)
     */
    private long dataCenterId;

    /**
     * 数据中心ID 计数器
     */
    private long dataCenterIdFlags = 0L;

    /**
     * 毫秒内序列(0~4095)
     */
    private long sequence = 0L;

    /**
     * 毫秒内序列基数[0|1024|2048|3072]
     */
    private long basicSequence = 0L;

    /**
     * 上次生成ID的时间戳
     */
    private long lastTimestamp = -1L;

    /**
     * 工作模式
     */
    private final WorkMode workMode;

    public enum WorkMode NON_SHARED, RATE_1024, RATE_4096;

    //==============================单例模式(静态内部类)=====================================
    private static class InnerClass
        private static final SnowflakeUtil INNER_DEMO = new SnowflakeUtil();
    
    public static SnowflakeUtil getInstance()
        return InnerClass.INNER_DEMO;
    

    //==============================Constructors=====================================

    public SnowflakeUtil() 
        this(0, 0, WorkMode.RATE_4096);
    

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     */
    public SnowflakeUtil(long workerId, long dataCenterId) 
        this(workerId, dataCenterId, WorkMode.RATE_4096);
    

    /**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     * @param workMode     工作模式
     */
    public SnowflakeUtil(long workerId, long dataCenterId, WorkMode workMode) 
        this.workMode = workMode;
        if (workerId > MAX_WORKER_ID || workerId < 0) 
            throw new IllegalArgumentException(MessageFormat.format("worker Id can't be greater than 0 or less than 0", MAX_WORKER_ID));
        
        if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) 
            throw new IllegalArgumentException(MessageFormat.format("datacenter Id can't be greater than 0 or less than 0", MAX_DATA_CENTER_ID));
        
        this.workerId = workerId;
        this.workerIdFlags = setSpecifiedBitTo1(this.workerIdFlags, this.workerId);
        this.dataCenterId = dataCenterId;
        this.dataCenterIdFlags = setSpecifiedBitTo1(this.dataCenterIdFlags, this.dataCenterId);
    

    // ==============================Methods==========================================

    /**
     * 获取机器id
     *
     * @return 所属机器的id
     */
    public long getWorkerId() 
        return workerId;
    

    /**
     * 获取数据中心id
     *
     * @return 所属数据中心id
     */
    public long getDataCenterId() 
        return dataCenterId;
    

    /**
     * 获得下一个ID (该方法是线程安全的)
     *
     * @return SnowflakeId
     */
    public synchronized long nextId() 
        long timestamp = timeGen();
        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < this.lastTimestamp) 
            if (timestamp > TWEPOCH) 
                if (WorkMode.NON_SHARED == this.workMode) 
                    nonSharedClockBackwards(timestamp);
                 else if (WorkMode.RATE_1024 == this.workMode) 
                    rate1024ClockBackwards(timestamp);
                 else 
                    throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
                
             else 
                throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
            
        
        //如果是同一时间生成的,则进行毫秒内序列
        if (this.lastTimestamp == timestamp) 
            this.sequence = (this.sequence + 1) & SEQUENCE_MASK;
            //毫秒内序列溢出
            if (this.sequence == 0) 
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(this.lastTimestamp);
            
        
        //时间戳改变,毫秒内序列重置
        else 
            this.sequence = this.basicSequence;
        
        //上次生成ID的时间戳
        this.lastTimestamp = timestamp;
        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT)
                | (this.dataCenterId << DATA_CENTER_ID_SHIFT)
                | (this.workerId << WORKER_ID_SHIFT)
                | this.sequence;
    

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间戳
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) 
        long timestamp0;
        do 
            timestamp0 = timeGen();
         while (timestamp0 <= lastTimestamp);
        return timestamp0;
    

    /**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */
    protected long timeGen() 
        return System.currentTimeMillis();
    

    /**
     * 尝试解决时钟回拨<br>【* 仅用于 单机生成不对外 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */
    private void nonSharedClockBackwards(long timestamp) 
        if (this.dataCenterIdFlags >= UINT_MAX_VALUE && this.workerIdFlags >= UINT_MAX_VALUE) 
            throw new RuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for 0 milliseconds", lastTimestamp - timestamp));
         else 
            //如果仅用于生成不重复的数值,尝试变更 dataCenterId 或 workerId 修复时钟回拨问题

            log.warn("Clock moved backwards. Refusing to generate id for  milliseconds", lastTimestamp - timestamp);
            //先尝试变更 dataCenterId,当 dataCenterId 轮询一遍之后,尝试变更 workerId 并重置 dataCenterId
            if (this.dataCenterIdFlags >= UINT_MAX_VALUE) 
                if (++this.workerId > MAX_WORKER_ID) 
                    this.workerId = 0L;
                
                this.workerIdFlags = setSpecifiedBitTo1<

springboot整合rabbitmq之典型应用场景实战三

实战前言RabbitMQ作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务... 查看详情

springboot下rabbitmq的实战应用:动态创建和动态监控队列死信备份交换机(代码片段)

一、应用场景业务中心根据业务需求向特定用户发送消息;发送前不确定由哪个用户接收特定用户接收特定消息;用户可以退出,再切换别的用户登录,用户登录后只接收与自已对应的消息二、总体要求项目要足... 查看详情

rabbitmq一文彻底弄懂rabbitmq的四种交换机原理及springboot实战应用(代码片段)

...原理及实战应用交换机概念direct直连交换机工作模式图解springboot代码Fanout扇出交换机工作模式图解springboot代码Topic主题交换机工作模式图解springboot代码header交换机交换机概念交换机可以理解成具有路由表的路由程序,仅此... 查看详情

springboot整合rabbitmq之整合配置篇

实战背景:RabbitMQ实战第一阶段-RabbitMQ的官网拜读已经结束了,相信诸位童鞋或多或少都能入了个门,如果还是觉得迷迷糊糊似懂非懂的,那我建议诸位可以亲自去拜读拜读官网的技术手册或者看多几篇我的视频跟源码!因为接... 查看详情

springboot整合rabbitmq之发送接收消息实战

实战前言前几篇文章中,我们介绍了SpringBoot整合RabbitMQ的配置以及实战了Spring的事件驱动模型,这两篇文章对于我们后续实战RabbitMQ其他知识要点将起到奠基的作用的。特别是Spring的事件驱动模型,当我们全篇实战完毕RabbitMQ并... 查看详情

springboot实战之rabbitmq

什么是RabbitMQ?RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ就扮演着邮箱、邮局以及邮递员... 查看详情

带着新人学springboot的应用07(springboot+rabbitmq下)

  说一两句废话,强烈推荐各位小伙伴空闲时候也可以写写自己的博客!不管水平高低,不管写的怎么样,不要觉得写不好或者水平不够就不写了(咳,我以前就是这样的想法。。。自我反省!)。  但是开始写博客之后,... 查看详情

六.rabbitmq消息队列的基础+实战

六.RabbitMQ消息队列的基础+实战1.消息队列简介2.消息队列的使用流程3.windows下的安装RabbitMQ4.rabbitmq和springboot结合5.rabbitmq结合springboot的重要类未完待续六.RabbitMQ消息队列的基础+实战1.消息队列简介①消息队列概 查看详情

springboot(二十二)集成rabbitmq---mq实战演练

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循MozillaPublicLicense开源协议。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。消息中间件的工作过程可以用生... 查看详情

重磅发布-rabbitmq实战系列完整视频教程

...技术手册,目的在于入门认识各大专有名词;第二阶段为SpringBoot整合RabbitMQ实战各种实际的业务模块并解决常见的问题!目的:对于消息中间件, 查看详情

springboot实战系列rabbitmq实现消息发送并实现邮箱发送异常监控报警实战(代码片段)

...待你的关注作业侠系列最新文章😉Java实现聊天程序SpringBoot实战系列🐷【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战环境搭建大集合环境搭建大集合(持续更新)在本栏中,我们之前... 查看详情

rabbitmq实战

RabbitMq消息消费者服务 开发工具Idea和Springboot来开发的。消息消费目前只是一个简单的Demo,后续会处理成更智能一些。首先配置文件类,RabbitMqConfig,里面配置一些用户名和密码嗨哟队列信息。package com.basic.rabbitmq.consumer.... 查看详情

springboot整合rabbitmq:5种模式实战(代码片段)

一、环境准备1、pom依赖<!--父工程依赖--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</vers 查看详情

docker下rabbitmq四部曲之三:细说java开发

...是《Docker下RabbitMQ四部曲》系列的第三篇,实战两个基于SpringBoot的工程,分别用来生产和消费RabbitMQ消息;前文链接前两章的内容是体验RabbitMQ服务,以及制作RabbitMQ镜像:《Docker下RabbitMQ四部曲之一:极速体验(单机和集群)》;... 查看详情

javaspringboot集成rabbitmq实战和总结(代码片段)

...不错。因为书中讲的都是Python和Php的例子,所以自己结合SpringBoot文档和朱小厮的博客做了一些总结,写了一些Springboot的例子。交换器、队列、绑定的声明SpringAMQP项目对RabbitMQ做了很好的封装,可以很方便的手动声明队列,交换... 查看详情

「mq实战」rabbitmq延迟队列,消息延迟推送

...统解决方案无疑大大降低了系统的整体性能和吞吐量:在RabbitMQ3.6.x之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列在RabbitMQ3.6.x开始,RabbitMQ官方提供了... 查看详情

带着新人学springboot的应用05(springboot+rabbitmq上)

  这次就来说说RabbitMQ,这个应该不陌生了,随便一查就知道这个是用来做消息队列的。(注意:这一节很多都是概念的东西,需要操作的比较少)  至于AMQP协议(AdvancedMessageQueuingProtocol),专业名称叫做高级消息队列协议,... 查看详情

activemq的作用总结(应用场景及优势)以及springboot+activemq实战(代码片段)

 业务场景说明:消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库... 查看详情