rocketmq使用延迟消息(代码片段)

乐观男孩 乐观男孩     2022-12-11     450

关键词:

目录

说明

1、延时消息不是延迟发送,消息是实时发送的,只是消费者延迟消费
2、延迟消息主要通过对Message设置延迟级别实现,生产者和消费者按照正常逻辑进行生产和消费。

生产端

@Test
    public void sendMessage() throws Exception 
        DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
        Message message = new Message(RocketMqUtil.TOPIC, "schedule",
                "schedule-message".getBytes(Charset.forName("UTF-8")));
        //设置延迟级别,延迟级别≠延迟时间
        message.setDelayTimeLevel(5);
        SendResult sendResult = defaultMQProducer.send(message);
        log.info("发送消息结果:", sendResult.getSendStatus().name());
    

消费端

@Test
    public void consumer() throws Exception 
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "schedule");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                for (MessageExt messageExt : msgs) 
                    log.info("出生时间:,存储时间:,当前时间:,消息内容:",
                            sdf.format(new Date(messageExt.getBornTimestamp())),
                            sdf.format(new Date(messageExt.getStoreTimestamp())),
                            sdf.format(new Date()),
                            new String(messageExt.getBody(), Charset.forName("UTF-8")));
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        defaultMQPushConsumer.start();
        Thread.sleep(30000L);
        defaultMQPushConsumer.shutdown();
    

运行结果示例


这里解释一下:
BornTimestamp:消息的出生时间,这是在客户端发送消息前设置(DefaultMQProducerImpl.sendKernelImpl)。
StoreTimestamp:消息的存储时间(并不是指服务端接收的时间),这是在服务端进行设置的(CommitLog.asyncPutMessage)。

从打印的结果可以看出,延迟时间是从BornTimestamp开始计算的,客户端发送消息很快,消息到达服务端后即返回。服务端在延迟指定时间后,消息才会被消费端看到。如果没有达到指定的时间,RocketMq-console也无法看到该消息。

延迟级别与延迟时间对应关系

延迟级别123456789101112131415161718
延迟时间1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h

rocketmq使用延迟消息(代码片段)

...blicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqUtil.getDefaultMQProducer();Messagemessage=newMessage(RocketMqUtil.TOPIC,"schedule","schedule-message".getBytes(Charset.forName("UTF-8")));//设置延迟级别,延迟级别≠延迟... 查看详情

rocketmq自定义消息与延迟消息(代码片段)

...的格式使用JSON进行定义可以提高消息内容的扩展性,RocketMQ支持传递JSON数据格式。代码示例在生产端和消费端定义模型类:packagecom.yyl.test.rocketmq.model;importlombok.Data;importlombok.NoArgsConstructor;importlombok.ToString;importjava.io.Seriali... 查看详情

10springboot整合rocketmq实现延迟消息(代码片段)

延迟消息对于消息中间件来说,producer将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到queue中让consumer进行消费。​延迟消息的使用场景... 查看详情

rocketmq源码(24)—defaultmqpushconsumer延迟消息源码(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。文章目录1load加载延迟消息数据1.1parseDelayLevel解析延迟等级2start启动调度消息服务3DeliverDelayedMessageTimerTask投递延迟消息任务3.1executeOnTimeup执行延迟消息投递3... 查看详情

三.rocketmq极简入门-rocketmq普通消息发送(代码片段)

前言RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消... 查看详情

rocketmq快速入门:消息发送延迟消息消费重试(代码片段)

...分享🐳。偶尔认知思考、日常水文🐌。目录1、RocketMQ消息结构1.1、消息结构1.2、三种消息发松方式2、快速搭建工程2.1、创建rocketmq-demo父工程2.2、生产者工程2.3、消费者工程2.4、消息发送过程3、消息发送过程4、三种消... 查看详情

一次rocketmq顺序消费延迟的问题定位(代码片段)

一次RocketMQ顺序消费延迟的问题定位问题背景与现象昨晚收到了应用报警,发现线上某个业务消费消息延迟了54s多(从消息发送到MQ到被消费的间隔):2021-06-30T23:12:46.756messageprocessingisincrediblydelayed!(Currentdelaytime:547... 查看详情

rocketmq(十五)延时消息(代码片段)

...时长后才可被消费处理的消息,称为延时消息。采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票... 查看详情

rocketmq延时消息自定义配置;topic下tag使用(代码片段)

概述使用的是开源版本的rocketmq4.9.4rocketmq也是支持延时消息的。rocketmq一般是4个部分:nameserver:保存路由信息broker:保存消息生产者:生产消息消费者:消费消息延时消息的处理是在其中的broker中。但是rocketm... 查看详情

rocketmq延时消息(代码片段)

rocketmq提供一种延时消息的解决方案,就是在特定的时间到了,消息才会被投递出去供consumer消费。总体来是简单的场景是满足了,但是需要注意的是延时的时间是需要按照默认配置的延时级别去配置的,而不是随意设置消息的... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq有序消息(代码片段)

RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情

rocketmq使用批量消息(代码片段)

目录说明生产端消费端遗留问题说明批量发送消息条件:1、发送到同一个topic2、等待同一个发送结果3、不允许使用定时消息4、不支持半事务特性5、同一批消息大小不能超过1MB,否则需要自己进行切割发送批量消息,... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情