springboot整合rocketmq的各种消息类型,生产者,消费者(代码片段)

可——叹——落叶飘零 可——叹——落叶飘零     2022-10-21     598

关键词:

文章目录

Springboot整合使用

pom依赖

 <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.1.0</version>
</dependency>

yml

group: producer-demo1指定生产者组名

rocketmq:
  name-server: 192.168.126.100:9876;192.168.64.152:9876
  producer:
    #生产者组名,规定在一个应用里面必须唯一
    group: group-1
    #消息发送的超时时间,默认为3000ms
    send-message-timeout: 3000
    #消息达到4096字节的时候,消息就会被压缩。默认4096
    compress-message-body-threshold: 4096
    #最大的消息限制 默认为128K
    max-message-size: 4194304
    #同步消息发送失败重试次数
    retry-times-when-send-failed: 3
    #在内部发送失败时是否重试其他代理,这个参数在有多个broker才生效。
    retry-next-server: true
    #异步消息发送失败重试的次数
    retry-times-when-send-async-failed: 3
    

整合使用

消息类型不同比如string,pojo实体类型需要分不同的topic,同consumer-group需要对应一种topic,consumer不管配置什么topic,tag,最后都是以group为准

如果某个consumer的group名字与group-name1相同,但这个consumer的topic信息与group-name1这个组的topic不同,这会导致group-name1这个组的consumer不能接收全部消息,也就是会导致部分消息无法被消费,所以不要乱操作

(如果group不同但是topic和tag都相同,那么所有group都会消费同样的消息,有多少个group就消费多少次)

(一台机器可部署多个consumer,需要保证他们的group-name不同,通常topic与group一一对应,所以topic也不同)

为方便查看,示例关键代码
某个service层的实现类MQServiceImpl

package com.ro.service;
import com.ro.pojo.User;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Service
public class MQServiceImpl implements MQService

    Logger log= LoggerFactory.getLogger(MQServiceImpl.class);
    @Autowired
    private RocketMQTemplate mqTemplate;

    /**
     * 同步消息
     * 消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
     */
    @Override
    public void syncMQMessageSend() 
        mqTemplate.syncSend("topic1:tag1", "hello1");
        mqTemplate.syncSend("topic1:tag1", "hello2");
        SendResult s2=mqTemplate.syncSend("topic1:tag1", "hello3");
        log.info("3条同步消息String类型已发送:topic:topic1,tag:tag1:",s2);
        User user=new User("tom",100);
        SendResult result=mqTemplate.syncSend("topic2:tag1", MessageBuilder.withPayload(user).build());
        //可以简写成以下,直接传入pojo对象
        SendResult result2=mqTemplate.syncSend("topic2:tag1", user.setName("lily").setAge(200));
        log.info("object类型同步消息发送结果:,",result,result2);
    

    /**
     * 异步消息
     * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
     * 关键实现异步发送回调接口(SendCallback)
     * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
     * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
     */
    @Override
    public void asyncMQMessageSend() 
        User user=new User("tom",100);
        mqTemplate.asyncSend("topic3:tag1", user, new SendCallback() 
            @Override
            public void onSuccess(SendResult sendResult) 
                log.info("异步消息发送成功:",sendResult);
            

            @Override
            public void onException(Throwable throwable) 
                log.info("异步消息发送失败:",throwable.getMessage());
            
        );
    

    /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     */
    @Override
    public void oneWaySendMQMessageSend() 
        User user=new User("tom",100);
        mqTemplate.sendOneWay("topic4:tag1", user);
        log.info("单向消息已发送");
    
    /**
     * 延迟消息
     * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
     * RocketMQ 目前只支持固定精度的定时消息。
     * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 延迟的底层方法是用定时任务实现的。
     */
    @Override
    public void delayedSendMQMessageSend() 
        User user=new User("tom",100);
        SendCallback sc=new SendCallback() 
            @Override
            public void onSuccess(SendResult sendResult) 
                log.info("发送异步延时消息成功");
            

            @Override
            public void onException(Throwable throwable) 
                log.info("发送异步延时消息失败:",throwable.getMessage());
            
        ;
        Message<User> message=MessageBuilder.withPayload(user).build();
        //异步延时
        mqTemplate.asyncSend("topic5:tag1", message, sc, 3000, 3);
        //同步延时(少一个sendCallback)
        SendResult result=mqTemplate.syncSend("topic5:tag1", message, 3000, 3);
        log.info("发送同步延时消息成功:",result);
    

    /**
     * 顺序消息
     *使用hashcode对topic队列数量取模得到对应队列
     * 使消息按照顺序被消费,顺序与生产出来的顺序一致
     * 比如同一个订单生成,付费顺序需要一致,可以按照订单id来当作hashkey
     */
    @Override
    public void orderlyMQMessageSend() 
        String s1[]="tom","1";
        String s2[]="klee我和tom在同一个消费者被消费,而且在tom之后","1";
        String s3[]="lily我可能不会和tom在同一个消费者被消费","2";
        //同步顺序,也可以是其他类型比如异步顺序,单向顺序
        mqTemplate.syncSendOrderly("topic6:tag1", s1[0],s1[1]);
        mqTemplate.syncSendOrderly("topic6:tag1", s2[0],s2[1]);
        mqTemplate.syncSendOrderly("topic6:tag1", s3[0],s3[1]);
        log.info("单向消息已发送");
    

    /**
     * 过滤消息
     * Tag 过滤
     * Sql 过滤
     * Sql类型语法:
     * 数值比较,比如:>,>=,<,<=,BETWEEN,=;
     * 字符比较,比如:=,<>,IN;
     * IS NULL 或者 IS NOT NULL;
     * 逻辑符号 AND,OR,NOT;
     */
    @Override
    public void selectorMQSend() 
        //Tag过滤就是在发送参数上指定,比如topic1:tag1就指定了tag1,这种光topic1不指定就是所有tag
        //这里使用sql92
        User user=new User("tom",16);
        User user2=new User("klee",9);
        Message<User> message=MessageBuilder
                .withPayload(user)
                .setHeader("age", user.getAge())
                .build();
        Message<User> message2=MessageBuilder
                .withPayload(user)
                .setHeader("age", user2.getAge())
                .build();
        mqTemplate.syncSend("topic10", message);//age=16,消费者设置sql92过滤(header)头数据age=9
        mqTemplate.syncSend("topic10", message2);//age=9
        log.info("添加age头信息的过滤消息发送完毕");
    

    /**
     * 分布式事物消息
     *生产者需要一个监听自己的类
     */
    @Override
    public void transactionMQSend() 
        User user=new User("klee",9);
        Message<User> message=MessageBuilder
                .withPayload(user)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
                .build();
        TransactionSendResult result=mqTemplate.sendMessageInTransaction("topic15:tag1", message, null);
        if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
                &&
                result.getSendStatus().equals(SendStatus.SEND_OK))
           log.info("事物消息发送成功");

        
        log.info("事物消息发送结果:",result);
    

同步消息消费者

package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-1",topic = "topic1",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<String> 
    Logger log= LoggerFactory.getLogger(MQConsumer.class);
    @Override
    public void onMessage(String s) 
        log.info("consumer-1 收到string类型消息:",s);
    


package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-2",topic = "topic2",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<User> 
    Logger log= LoggerFactory.getLogger(MQConsumer.class);
    @Override
    public void onMessage(User u) 
        log.info("consumer-2 收到user类型消息:",u);
    

异步消息消费者

代码与同步消费者相同

单向消息消费者

代码与同步消费者相同

延时消息消费者

代码与同步消费者相同

顺序消息消费者

只需要在之前的基础上指定consumeMode = ConsumeMode.ORDERLY

package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY,consumerGroup = "consumer-orderly",topic = "topic6",selectorExpression = "tag1")
public class MQConsumerOrderly implements RocketMQListener<String> 
    Logger log= LoggerFactory.getLogger(MQConsumerOrderly.class);
    @Override
    public void onMessage(String s) 
        log.info("接收到顺序消息了:",s);
    

sql92过滤消息消费者

更多rocketmq集群方案安装配置请查看我的其他文章
https://blog.csdn.net/UnicornRe/article/details/117745226
需要在安装的配置文件xxx.conf加入

#支持sql92
enablePropertyFilter=true

sql语法

数值比较,比如:>>=<<=,BETWEEN,=;
字符比较,比如:=<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
//selectorExpression指明了只能接收消息属性(header)中age的消息。
@RocketMQMessageListener(
        selectorType = SelectorType.SQL92,
        consumerGroup = "consumer-selector",
        topic = "topic10",
        selectorExpression = "age = 9")
public class MQConsumer implements RocketMQListener<User> 
    Logger log= LoggerFactory.getLogger(MQConsumer.class);
    @Override
    public void onMessage(User u) 
        log.info("接收到sql过滤消息了:",u);
    

事物消息消费者

发送方向 MQ 服务端发送消息。
MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

producer需要一个监听类

package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RocketMQTransactionListener
public class TransactionListner implements RocketMQLocalTransactionListener 
    Logger log= LoggerFactory.getLogger(TransactionListner.class);
    //保障线程安全且保证高性能的hashmap,用来记录执行结果
    private static Map<String,RocketMQLocalTransactionState> transStateMap=new ConcurrentHashMap<>();
    //执行本地事物
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) 
        //方法里的object o这个对象是生产者发送消息方法最后一个参数的值
        String transId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        log.info("事物id:",transId);
        try 
            //模拟执行任务
            Thread.sleep查看详情  

[原创]springboot整合rocketmq消息队列

参考技术A什么情况下的异步操作需要使用消息队列而不是多线程?StartNameServerStartBrokerWindows下安装RocketMq:https://www.jianshu.com/p/4a275e779afaRocketMq名词解释:https://my.oschina.net/javamaster/blog/2051703解释Push与Pull区别:https://www.jianshu.com/p/f0... 查看详情

4springboot整合rocketmq实现消息发送和接收(代码片段)

我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;最终项目结构如下:具体步骤如下:第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置<dependency> <groupId>org.apac... 查看详情

6springboot整合rocketmq发送异步消息(代码片段)

发送异步消息是指producer向broker发送消息时指定消息发送成功及发送异常的回调方法,调用API后立即返回,producer发送消息线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。相对发送同步消息,... 查看详情

10springboot整合rocketmq实现延迟消息(代码片段)

延迟消息对于消息中间件来说,producer将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到queue中让consumer进行消费。​延迟消息的使用场景... 查看详情

9springboot整合rocketmq实现顺序消息(代码片段)

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情

7springboot整合rocketmq发送单向消息(代码片段)

发送单向消息是指producer向broker发送消息,执行API时直接返回,不等待broker服务器的结果。这种方式主要用在不特别关心发送结果的场景,举例:日志发送;RocketMQTemplate给我们提供了sendOneWay方法(有多个重载)&#... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

13springboot整合rocketmq实现过滤消息-根据sql表达式过滤消息(代码片段)

SQL表达式方式可以根据发送消息时输入的属性进行一些计算。RocketMQ的SQL表达式语法只定义了一些基本的语法功能。数字比较,如>,>=,<,<=,BETWEEN,=;字符比较,如:=&#x... 查看详情

springboot整合rocketmq

参考技术AApacheRocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。支持Broker和Consumer端消息过滤,支持发布订阅模型和点对点,支持拉pull和推push两种消息模式,单一队列百万消息、亿级消息堆积,支持单master节... 查看详情

12springboot整合rocketmq实现过滤消息-根据tag方式过滤消息(代码片段)

消息发送端只能设置一个tag,消息接收端可以设置多个tag。接收消息端通过‘||’设置多个tag,如下:tag1||tag2||tag3||…上实例,生产端发送三个消息,TAG分别是TAG1,TAG2,TAG3/***发送带Tag消息,测试根据Tag过滤消息*/... 查看详情

5springboot整合rocketmq发送同步消息(代码片段)

发送同步消息是指producer向broker发送消息,执行API时同步等待,直到broker服务器返回发送结果;相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如&#x... 查看详情

springboot系列-springboot整合rabbitmq

一RabbitMQ的介绍    RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).  消息中间件的工作过程可... 查看详情

rocketmq(20)——整合springboot(代码片段)

整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.rock... 查看详情

rocketmq(20)——整合springboot(代码片段)

整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.ro 查看详情

springboot整合rocketmq

...tMQ(2)---Docker部署RocketMQ集群这篇在上篇搭建好的基础上,将SpringBoot整合RocketMQ实现生产消费。GitHub地址: https://github.com/yudiandemingzi/spring-boot-study一、搭建步骤先说下技术大致架构SpringBoot2.1.6+Mav 查看详情

rocketmq(二十四)整合springboot(代码片段)

1、官网RocketMQ为SpringBoot提供了整合方案,官网地址如下,上面提供了详细的整合步骤及案例。https://github.com/apache/rocketmq-spring官方详细文档(可以切换不同的版本)https://github.com/apache/rocketmq-spring/blob/release-2.0.1 查看详情

springboot整合rocketmq实现入门案例

...习了Spring整合RocketMQ的第一个案例!现在我们来学习SpringBoot如何整合RocketMQ实现更加简单的使用!文章目录1创建maven项目2配置文件3生产者4消费者5测试1创建maven项目创建一个maven项目。引入sp 查看详情