springboot使用rabbitmq实现延时任务(代码片段)

wintercloud wintercloud     2022-12-10     241

关键词:

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景

订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。
短信通知: 下单成功后 60s 之后给用户发送短信通知。
失败重试: 业务操作失败后,间隔一定的时间进行失败重试。

本文基于springboot,使用rabbitmq_delayed_message_exchange插件实现延时队列(RabbitMQ及其插件环境安装点此),具体实践如下:

application.properties

spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.host=192.168.1.123
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=test

XdelayConfig.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class XdelayConfig 
 
    /**
     * 立即消费的队列名称
     */
    public static final String IMMEDIATE_QUEUE_XDELAY = "queue.xdelay.immediate";
    /**
     * 延时的exchange
     */
    public static final String DELAYED_EXCHANGE_XDELAY = "exchange.xdelay.delayed";
    public static final String DELAY_ROUTING_KEY_XDELAY = "routingkey.xdelay.delay";

    /**
     * 创建一个立即消费队列
     *
     * @return
     */
    @Bean
    public Queue immediateQueue() 
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(IMMEDIATE_QUEUE_XDELAY, true);
    

    @Bean
    public CustomExchange delayExchange() 
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_XDELAY, "x-delayed-message", true, false, args);
    

    /**
     * 把立即消费的队列和延时消费的exchange绑定在一起
     *
     * @return
     */
    @Bean
    public Binding bindingNotify() 
        return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY_XDELAY).noargs();
    

XdelaySender.java 生产者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 生产者
 */
@Component
public class XdelaySender 
    private final static Logger logger = LoggerFactory.getLogger(XdelaySender.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg, int delayTime) 
        logger.info("msg= " + msg + ".delayTime" + delayTime);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        this.rabbitTemplate.convertAndSend(XdelayConfig.DELAYED_EXCHANGE_XDELAY, XdelayConfig.DELAY_ROUTING_KEY_XDELAY, msg, message -> 
            message.getMessageProperties().setDelay(delayTime);
            System.out.println(sdf.format(new Date()) + " Delay sent.");
            return message;
        );
    

XdelayReceiver.java 消费者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 消费者
 */
@Component
@EnableRabbit
@Configuration
public class XdelayReceiver 
    private final static Logger logger = LoggerFactory.getLogger(XdelayReceiver.class);

    @RabbitListener(queues = com.example.antchat.rabbitmq.XdelayConfig.IMMEDIATE_QUEUE_XDELAY)
    public void get(String msg) 
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        logger.info("收到延时消息时间:" + sdf.format(new Date()) + " Delay sent.");
        logger.info("收到延时消息:" + msg);
    

测试

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitMQController 
    @Autowired
    XdelaySender xdelaySender;
    @RequestMapping("/testRabbit")
    public void testRabbit() 
        xdelaySender.send("我来发一个测试消息,10秒", 10000);//10秒
        xdelaySender.send("我来发一个测试消息,2秒", 2000);//2秒
        xdelaySender.send("我来发一个测试消息,1秒", 2000);//1秒
    

参考博文:

微服务-springboot-rabbitmq:实现延时队列

 

rabbitmq实现延时队列-springboot版本(代码片段)

 rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列; 原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;步骤:1、创建带有时... 查看详情

springboot-rabbitmq:实现延时队列

延时队列应用于什么场景延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。那么,为什么需要延迟消费呢?我们来看以下的场景   网上商城下订单后30分钟后没有... 查看详情

rabbitmq---延迟队列,整合springboot(代码片段)

...场景RabbitMQ中的TTL队列设置TTL消息设置TTL两者的区别整合springbootpom文件配置文件添加Swagger配置类队列TTL代码架构代码实现延时队列优化代码架构图实现Rabbitmq插件实现延迟队列安装延时队列插件代码架构图总结发布确认高级介绍... 查看详情

rabbitmq实现延时队列(代码片段)

...列后不会立即被消费,可以被延迟一定的时间,再进行消费.RabbitMQ没有提供延迟队列功能,但是可以使用TTL+DLX来实现延迟队列效果使用场景电商平台下单后,30分钟未支付,取消订单回滚库存;新用户注册成功一周后,发送问候短信等... 查看详情

08-rabbitmq-springboot-延时队列(代码片段)

一、springBoot整合RabbitMQ1、IDEA创建一个SpringBoot的项目2、导入相关的依赖<!--导入依赖--><dependencies><!--RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactI 查看详情

基于rabbitmq实现分布式延时任务调度(代码片段)

...特殊处理(加分布式锁)避免任务被重复执行。然而使用RabbitMQ实现延时任务可以天然解决分布式环境下重复执行的问题(利用mq中消息只会被一个消费者消费这一特性可以让延时任务只会被一个消费者执行)。基于RabbitMQ做延时... 查看详情

rabbitmq实现延时消息的两种方式

参考技术ARabbitmq实现延时消息的两种方式使用队列的ttl特性,即配置死信队列的消息重新路由到消费队列中,同一个队列的消息过期时间将相同,即使消息本身可能带上了ttl,同样会因队头的消息未过期而无法消费;定义两个队... 查看详情

springbootrabbitmq集成,延时消息队列实现

本篇主要记录Springboot集成Rabbitmq,分为两部分,第一部分为创建普通消息队列,第二部分为延时消息队列实现: springboot提供对mq消息队列支持amqp相关包,引入即可: [html] viewplain copy <!-- rabbit mq -... 查看详情

rabbitmq安装延时队列插件实现延时队列(代码片段)

下载插件地址要注意和自己的rabbitmq的版本对应起来https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases我的mq是docker安装的3.9.7的下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-com... 查看详情

springboot学习——springboot快速整合rabbitmq

RabbitMQ消息队列@[toc]简介优点erlang开发,并发能力强。社区活跃,使用的人多,稳定性较强。延时低缺点erlang语言开发的,国内精通的不多,日后定制开发困难。RabbitMQ工作模式1,"HelloWorld!"模式简单模式是RabbitMQ最简单入... 查看详情

springcloud使用stream配置rabbitmq实现延时消息(代码片段)

先安装rabbitMq延时插件参考我另一篇文章https://blog.csdn.net/weixin_43944305/article/details/120828003上依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><g... 查看详情

rabbitmq:伪延时队列(代码片段)

目录一、什么是延时队列二、RabbitMQ实现三、延时队列的问题四、解决RabbitMQ的伪延时方案ps:伪延时队列先卖个关子,我们先了解下延时队列。一、什么是延时队列所谓延时队列是指消息push到队列后,监听的消费者不能第一时间... 查看详情

springboot执行延时任务-delayqueue的使用

DelayQueue简介在很多场景我们需要用到延时任务,比如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等,这些都可以使用延时任务来实现。jdk中DelayQueue可以实现上述需求,顾名思义D... 查看详情

[springboot]springboot(14)rabbitmq延迟队列

1前言  延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。  实现延迟队列的方式有两种:... 查看详情

rocketmq延时消息实现原理探究(代码片段)

...消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。其中rabbitmq延时消息是通过设置队列ttl+死信exchange实现缺点嘛:每次都得设置两个队列,一个用来实现延时࿰... 查看详情

springboot(十四)rabbitmq延迟队列

一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。实现延迟队列的方式有两种:通过... 查看详情

rabbitmq延时队列,订单过期,取消支付场景(代码片段)

...队列的延时队列基于延时交换机的延时队列延时队列图解SpringBoot伪代码配置类生产者消息的TTL和队列的TTL总结RabbitMq延时队列实际应用场景比如,boss让你开发一个30分钟客户不付款就取消订单的场景;如果在促销活动期... 查看详情

springboot(十四)rabbitmq延迟队列

一、前言延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。<!--more-->实现延迟队列的方式有... 查看详情