rocketmq(二十四)整合springboot(代码片段)

爱是与世界平行 爱是与世界平行     2022-10-23     538

关键词:

1、官网

RocketMQ为 SpringBoot 提供了整合方案,官网地址如下,上面提供了详细的整合步骤及案例。

https://github.com/apache/rocketmq-spring

官方详细文档(可以切换不同的版本)

https://github.com/apache/rocketmq-spring/blob/release-2.0.1/README.md

2、消息生产者

1)添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
</dependencies>

2)配置文件

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=my-group

3)启动类

@SpringBootApplication
public class MQProducerApplication 
    public static void main(String[] args) 
        SpringApplication.run(MQSpringBootApplication.class);
    

4)测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MQSpringBootApplication.class)
public class ProducerTest 
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void test1()
        rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
    

3、消息消费者

1)添加依赖

同消息生产者

2)配置文件

同消息生产者

3)启动类

@SpringBootApplication
public class MQConsumerApplication 
    public static void main(String[] args) 
        SpringApplication.run(MQSpringBootApplication.class);
    

4)消息监听器

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> 
    @Override
    public void onMessage(String message) 
        log.info("Receive message:"+message);
    

5)RocketMQMessageListener参数

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener 
    String NAME_SERVER_PLACEHOLDER = "$rocketmq.name-server:";
    String ACCESS_KEY_PLACEHOLDER = "$rocketmq.consumer.access-key:";
    String SECRET_KEY_PLACEHOLDER = "$rocketmq.consumer.secret-key:";
    String TRACE_TOPIC_PLACEHOLDER = "$rocketmq.consumer.customized-trace-topic:";
    String ACCESS_CHANNEL_PLACEHOLDER = "$rocketmq.access-channel:";
    /**
     * 消费者分组
     *
     * @return
     */
    String consumerGroup();
    /**
     * 主题
     */
    String topic();
    /**
     * selectorType:消息选择器类型
     * - SelectorType.TAG:默认值,根据TAG选择,仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
     * - SelectorType.SQL92:根据SQL92表达式选择
     */
    SelectorType selectorType() default SelectorType.TAG;
    /**
     * selectorType 对应的表达式
     */
    String selectorExpression() default "*";
    /**
     * consumeMode:消费模式
     * - ConsumeMode.CONCURRENTLY:默认值,并行处理
     * - ConsumeMode.ORDERLY:按顺序处理
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    /**
     * messageMode:消息模型
     * - MessageModel.CLUSTERING:默认值,集群
     * - MessageModel.BROADCASTING:广播
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;
    /**
     * 最大线程数,默认值 64
     */
    int consumeThreadMax() default 64;
    /**
     * 消费失败,最大重试次数
     * <p>
     * - 在并发模式中,-1表示16
     * - 在有序模式中,-1表示整数最大值
     */
    int maxReconsumeTimes() default -1;
    /**
     * 消息可能阻止使用线程的最长时间(分钟)
     */
    long consumeTimeout() default 15L;
    /**
     * 发送回复消息超时
     */
    int replyTimeout() default 3000;
    /**
     * 默认值 $rocketmq.consumer.access-key:
     */
    String accessKey() default ACCESS_KEY_PLACEHOLDER;
    /**
     * 默认值 $rocketmq.consumer.secret-key:
     */
    String secretKey() default SECRET_KEY_PLACEHOLDER;
    /**
     * 启用消息轨迹,默认值 false
     */
    boolean enableMsgTrace() default false;
    /**
     * 自定义的消息轨迹主题,默认值$rocketmq.consumer.customized-trace-topic:
     * 没有配置此配置项则使用默认的主题
     */
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    /**
     * 命名服务器地址,默认值$rocketmq.name-server:
     */
    String nameServer() default NAME_SERVER_PLACEHOLDER;
    /**
     * 默认值$rocketmq.access-channel:
     */
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

4、发送同步消息

同步消息也就这些API,简单讲解一下!

//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest 
    private int id;
    private String context;
    private Date date;

/**
 * 同步消息-
 */
@Test
void syncSendStr() 
    //syncSend和send是等价的
    rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
    //send底层还是会调用syncSend的代码
    rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
    SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
    log.info("syncSend===>",res);

/**
 * 同步消息-
 */
@Test
void syncSendPojo() 
    MsgTest msg = new MsgTest(1,"hello world test3",new Date());
    SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
    log.info("syncSend===>",res);

这里存在两种消息体,一种是Object的,另一种是Message<?>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用

MessageBuilder.withPayload("hello world test1").build()

是一样的!源码如下

5、异步消息

//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)
/**
 * 异步消息-String
 * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
 * 关键实现异步发送回调接口(SendCallback)
 * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
 * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
 */
@Test
void asyncSendStr() 
    rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() 
        @Override
        public void onSuccess(SendResult sendResult) 
            log.info("异步消息发送成功:", sendResult);
        
        @Override
        public void onException(Throwable throwable) 
            log.info("异步消息发送失败:", throwable.getMessage());
        
    );

6、单向消息

这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message

/**
 * 单向消息
 * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
 * 此方式发送消息的过程耗时非常短,一般在微秒级别
 * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
 */
@Test
void sendOneWayStr() 
    rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
    log.info("单向消息已发送");

6、批量消息

/**
 * 批量消息
 */
@Test
void asyncSendBatch() 
    Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
    List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
    SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
    log.info("批量消息");

7、延迟消息

1)同步延迟消息

/**
 * 同步延迟消息
 * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
 * RocketMQ 目前只支持固定精度的定时消息。
 * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 * 延迟的底层方法是用定时任务实现的。
 */
@Test
void syncSendDelayedStr() 
    Message<String> message = MessageBuilder.withPayload("syncSendDelayedStr" + new Date()).build();
    /**
     * @param destination formats: `topicName:tags`
     * @param message 消息体
     * @param timeout 发送超时时间
     * @param delayLevel 延迟级别  1到18
     * @return @link SendResult
     */
    SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
    log.info("res==>", res);

2)异步延迟消息

/**
 * 异步延迟消息
 */
@Test
void asyncSendDelayedStr() 
    //Callback
    SendCallback sc=new SendCallback() 
        @Override
        public void onSuccess(SendResult sendResult) 
            log.info("发送异步延时消息成功");
        
        @Override
        public void onException(Throwable throwable) 
            log.info("发送异步延时消息失败:",throwable.getMessage());
        
    ;
    Message<String> message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
    rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);

8、顺序消息

使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!

/**
 * 顺序消息
 */
@Test
void SendOrderStr() 
    List<MsgTest> msgList = new ArrayList<>();
    for (int i = 0; i < 100; i++) 
        msgList.add(new MsgTest(100, "我是id为100的第" + (i + 1) + "条消息", new Date()));
    
    msgList.forEach(t -> 
        rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()), new SendCallback() 
            @Override
            public void onSuccess(SendResult sendResult) 
                log.info("异步消息发送成功:", sendResult);
            
            @Override
            public void onException(Throwable throwable) 
                log.info("异步消息发送失败:", throwable.getMessage());
            
        );
    );

9、事务消息

1)发送者

/**
 * 事务消息  注意这里还有一个监听器 TransactionListenerImpl
 */
@Test
void sendTransactionStr() 
    String[] tags = "TAGA", "TAGB", "TAGC";
    for (int i = 0; i < 3; i++) 
        Message<String> message = MessageBuilder.withPayload("事务消息===>" + i).build();
        TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction-str:" + tags[i], message, i + 1);
        if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) 
            log.info("事物消息发送成功");
        
        log.info("事物消息发送结果:", res);
    

2)事务消息生产者端的消息监听器

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener 
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) 
        log.info("executeLocalTransaction")查看详情  

springboot整合rocketmq

参考技术A运行脚本org.apache.rocketmq.common.message.MessageExt属性说明: 查看详情

rocketmq(20)——整合springboot(代码片段)

整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.rock... 查看详情

rocketmq(20)——整合springboot(代码片段)

整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.ro 查看详情

springboot入门二十四,application事件监听

SpringBootApplication共支持6种事件监听,分别是://1.Spring最开始启动的时候触发ApplicationStartingEvent//2.Spring已经准备好上下文但是上下文尚未创建的时候触发ApplicationEnvironmentPreparedEvent//3.Bean定义加载之后、刷新上下文之前触发Applicatio... 查看详情

springboot整合rocketmq实现入门案例

...习了Spring整合RocketMQ的第一个案例!现在我们来学习SpringBoot如何整合RocketMQ实现更加简单的使用!文章目录1创建maven项目2配置文件3生产者4消费者5测试1创建maven项目创建一个maven项目。引入sp 查看详情

springboot整合rocketmq(代码片段)

项目环境jdk:1.8rocketmq:4.5.1springboot:2.6.3引入依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId>& 查看详情

springcloud-springcloud之stream构建消息驱动微服务框架;springcloudalibaba集成rocketmq(二十四)(代码片段)

阅读本文前可先参考SpringCloud-SpringCloud之Stream构建消息驱动微服务框架;RabbitMQ(十九)_MinggeQingchun的博客-CSDN博客_springcloudstream一、SpringCloudStream在微服务的开发过程中,会经常用到消息中间件,通过消息中间... 查看详情

4springboot整合rocketmq实现消息发送和接收(代码片段)

我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;最终项目结构如下:具体步骤如下:第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置<dependency> <groupId>org.apac... 查看详情

springboot入门二十三,整合redis

  项目基本配置参考文章SpringBoot入门一,使用myEclipse新建一个SpringBoot项目,使用MyEclipse新建一个SpringBoot项目即可,此示例springboot升级为2.2.1版本。1.pom.xml添加Redis支持<!--5.引入redis依赖--><dependency><groupId>org.springframe... 查看详情

springboot2.x整合rocketmq4.x

开发生产者代码第一步:创建很普通的SpringBoot项目第二步:加入相关依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version></ 查看详情

springboot整合rocketmq的各种消息类型,生产者,消费者(代码片段)

文章目录Springboot整合使用pom依赖yml整合使用同步消息消费者异步消息消费者单向消息消费者延时消息消费者顺序消息消费者sql92过滤消息消费者事物消息消费者@RocketMQMessageListener参数解释我的rocketmq各种集群方案安装Springboot... 查看详情

[原创]springboot整合rocketmq消息队列

参考技术A什么情况下的异步操作需要使用消息队列而不是多线程?StartNameServerStartBrokerWindows下安装RocketMq:https://www.jianshu.com/p/4a275e779afaRocketMq名词解释:https://my.oschina.net/javamaster/blog/2051703解释Push与Pull区别:https://www.jianshu.com/p/f0... 查看详情

springboot(十四):springboot整合shiro-登录认证和权限管理

这篇文章我们来学习如何使用SpringBoot集成ApacheShiro。安全应该是互联网公司的一道生命线,几乎任何的公司都会涉及到这方面的需求。在Java领域一般有SpringSecurity、ApacheShiro等安全框架,但是由于SpringSecurity过于庞大和复... 查看详情

二十一springboot2核心技术——整合activiti7(代码片段)

一、Activiti7与SpringBoot整合Activiti7发布正式版之后,它与SpringBoot2.x已经完全支持整合开发。1.1、SpringBoot整合Activiti7的配置为了能够实现SpringBoot与Activiti7整合开发,首先我们要引入相关的依赖支持。在工程的pom.xml文件中引... 查看详情

11springboot整合rocketmq实现事务消息(代码片段)

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:Half(Prepare)Message——半消息(预处理... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

二十springboot2核心技术——整合logback(代码片段)

一、SpringBoot整合logback1.1、日志文件SpringBoot官方推荐优先使用带有-spring的文件名作为你的日志配置(如使用logback-spring.xml,而不是logback.xml),命令为logback-spring.xml的日志配置文件。默认的命名规则,并且放... 查看详情