关键词:
前言
本节内容是关于RocketMQ消息中间键的实战内容,主要介绍在springboot项目中如何集成使用RocketMQ消息中间键,包括消息的发送、消息的接收以及RocketMQ的一些配置说明,以及效果说明。话不多说,开始实战内容。
正文
- 搭建RocketMQ集群
参考博客:Docker环境下使用docker-compose一键式搭建RocketMQ(4.5.0版本)集群及其管理工具(外网版)_北溟溟的博客-CSDN博客
-
引入RocketMQ的pom依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
- 在配置文件application.yml引入RocketMQ配置
#rocketmq配置 rocketmq: name-server: 192.168.56.10:9876;192.168.56.10:9877 producer: #生产者组名称 group: atp-producer #命名空间 namespace: atp #异步消息发送失败重试次数,默认是2 retry-times-when-send-async-failed: 2 #发送消息超时时间,默认2000ms send-message-timeout: 2000 #消息的最大长度:默认1024 * 1024 * 4(默认4M) max-message-size: 40000000 #压缩消息阈值,超过4k就压缩 compress-message-body-threshold: 4096 #是否发送失败,重试另外的broker retry-next-server: false #是否启用消息追踪 enable-msg-trace: false #默认追踪的主题 customized-trace-topic: RMQ_SYS_TRACE_TOPIC #消息发送失败重试的次数 retry-times-when-send-failed: 2
- 创建消息的生产者
①创建RocketMQ消息的生产者RocketProducer.java
package com.yundi.atp.platform.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component @Slf4j public class RocketProducer @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送同步消息:消息响应后发送下一条消息 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendSyncMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message); log.info("【RocketMQ】发送同步消息:", sendResult); /** * 发送异步消息:异步回调通知消息发送的状况 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendAsyncMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步消息异常:", e.getMessage()); ); /** * 发送单向消息:消息发送后无响应,可靠性差,效率高 * * @param topic 消息主题 * @param tag 消息tag * @param key 业务号 * @param data 消息内容 */ public void sendOneWayMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.sendOneWay(destination, message); /** * 同步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSend(destination, message, timeout, delayLevel); log.info("【RocketMQ】发送同步延迟消息:", sendResult); /** * 异步延迟消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 * @param timeout 发送消息的过期时间 * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSend(destination, message, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步延迟消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步延迟消息异常:", e.getMessage()); , timeout, delayLevel); /** * 同步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, key); log.info("【RocketMQ】发送同步顺序消息:", sendResult); /** * 异步顺序消息 * * @param topic 主题 * @param tag 标签 * @param key 业务号 * @param data 消息体 */ public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) //消息 Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build(); //主题 String destination = topic + ":" + tag; rocketMQTemplate.asyncSendOrderly(destination, message, key, new SendCallback() @Override public void onSuccess(SendResult sendResult) log.info("【RocketMQ】发送异步顺序消息:", sendResult); @Override public void onException(Throwable e) log.info("【RocketMQ】发送异步顺序消息异常:", e.getMessage()); );
②创建消息发送的web接口
package com.yundi.atp.platform.module.test.controller; import com.yundi.atp.platform.common.Result; import com.yundi.atp.platform.rocketmq.RocketConstant; import com.yundi.atp.platform.rocketmq.RocketProducer; import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @Api(tags = "Springboot集成RocketMQ测试") @RestController @RequestMapping(value = "/rocket") public class RocketContoller @Autowired private RocketProducer rocketProducer; @GetMapping(value = "/sendRocketTestMsg/topic/msg") public Result sendKafkaTestMsg(@PathVariable(value = "topic") String topic, @PathVariable(value = "msg") String msg) rocketProducer.sendSyncMsg(topic, RocketConstant.ROCKET_TAG, UUID.randomUUID().toString(), msg); return Result.success();
③创建前端消息发送界面
<template> <div class="container"> <div class="title"> <span>Springboot集成RocketMQ案例</span> <el-divider direction="vertical"></el-divider> <router-link to="home"> <span style="font-size: 18px;">退出</span> </router-link> </div> <el-divider>Test Staring</el-divider> <div style="width: 400px;background: #ddd;padding: 40px 20px;"> <el-form ref="form" :model="form" label-width="70px" class="login"> <el-form-item label="发送主题" prop="topic"> <el-input v-model="form.topic"></el-input> </el-form-item> <el-form-item label="发送消息" prop="msg"> <el-input v-model="form.msg"></el-input> </el-form-item> <el-button type="primary" @click="sengMsg" style="width: 100%;margin: 0;">立即发送</el-button> </el-form> </div> </div> </template> <script> export default name: "RocketMQ", data() return form: topic: 'atp', msg: 'hello world!', , methods: sengMsg() const _this = this; _this.$http.get('/rocket/sendRocketTestMsg/'+this.form.topic+"/"+this.form.msg,).then(res => if (res.data.code === 1) _this.$refs['form'].resetFields(); _this.$message.success(res.data.msg); else _this.$refs['form'].resetFields(); _this.$message.warning(res.data.msg); ).catch(error => _this.$message.error(error); ); </script> <style scoped lang="scss"> .container padding: 10px; </style>
- 创建消息的消费者
①创建消费者RocketConsumer.java
package com.yundi.atp.platform.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * @Description: rocket消费者 * * consumerGroup:消费者组 * topic:消费主题 * selectorType: 选则器模式,默认SelectorType.TAG * selectorExpression:消费模式的值 * consumeMode:消费者模式 CONCURRENTLY(默认消费模式) ORDERLY(顺序消费) * messageModel:消息模式 CLUSTERING(集群模式) BROADCASTING(广播模式) * consumeThreadNumber:消费者线程数 * maxReconsumeTimes:最大重复消费次数 * consumeTimeout:消费过期时间 * replyTimeout:重试时间 * enableMsgTrace:是否允许消息追踪 * customizedTraceTopic:自定义的消息追踪主题 * nameServer:注册服务器 * namespace:名称空间 * * @Date: 2021/11/3 18:31 * @Version: 1.0.0 */ @Slf4j @Component @RocketMQMessageListener(consumerGroup = RocketConstant.ROCKET_CONSUMER_GROUP, topic = RocketConstant.ROCKET_TOPIC, selectorExpression = RocketConstant.ROCKET_TAG, namespace = RocketConstant.ROCKET_NAMESPACE) public class RocketConsumer implements RocketMQListener<String> @Override public void onMessage(String message) log.info("message-------------------------------------->:", message);
②消息常量定义RocketConstant.java
package com.yundi.atp.platform.rocketmq; /** * @Description: rocketmq常量定义 * @Date: 2022/10/20 14:24 * @Version: 1.0.0 */ public class RocketConstant /** * 消费者组 */ public final static String ROCKET_CONSUMER_GROUP = "atp-consumer"; /** * 主题 */ public final static String ROCKET_TOPIC = "atp"; /** * tag */ public final static String ROCKET_TAG = "app"; /** * 名称空间 */ public final static String ROCKET_NAMESPACE = "atp";
- 验证结果
①启动RocketMQ集群,保证RocketMQ是启动状态
②启动后端与前端服务
③发送消息测试,控制台有消息,后端也可以消费到消息
结语
至此,关于springboot集成RocketMQ案例实战,我们下期见。。。
(十八)atp应用测试平台——关于springboot应用监控的那些事(代码片段)
...,快快告诉我你的绝招。本节内容我们主要介绍一下springboot应用的常见应用参数监控指标,从而更好的关注springboot应用的运行状况并实现应用的监控。除此之外,我们通过集成一 查看详情
(十三)atp应用测试平台——springboot集成kafka案例实战(代码片段)
...:①削峰填谷②异步解耦。本节我们主要介绍一下如何在springboot项目中集成kafka消息中间键,实现简单的数据分发以及消费的案例。正文kafka集群搭建快速搭建一个kafka集群,我们这里以docker 查看详情
atp应用测试平台——使用bat批处理实现springboot项目的启动与关闭(代码片段)
前言在windows环境中实现springboot项目的启停,我们可以通过cmd控制台来实现,更为优雅的方式是我们通过批处理文件来处理,这样可以屏蔽项目启动关停的难度,实现一键式的启停,并且自行配置java环境,... 查看详情
(十七)atp应用测试平台——自定义实现一个springboot2的线程池启动器starter
前言启动器是springboot的一大特点,我们可以根据项目自身需求按需装配我们的组件。例如我们需要操作redis,项目中可以添加一个redis的启动器spring-boot-starter-data-redis,这样redis的一些客户端操作功能我们就集成好了... 查看详情
(十七)atp应用测试平台——自定义实现一个springboot2的线程池启动器starter(代码片段)
前言启动器是springboot的一大特点,我们可以根据项目自身需求按需装配我们的组件。例如我们需要操作redis,项目中可以添加一个redis的启动器spring-boot-starter-data-redis,这样redis的一些客户端操作功能我们就集成好了... 查看详情
(十四)atp应用测试平台——使用docker-compose一键式安装atp应用测试平台的依赖服务(代码片段)
前言关于ATP应用服务测试平台的相关内容已经更新不少,下载项目的小伙伴第一时间一定是想着怎么把这个平台项目跑起来,看下小编花里胡哨的效果是否能正常show。不过由于依赖的增多,项目的服务也随之多了起... 查看详情
(二十四)atp应用测试平台——springboot集成fastdfs上传与下载功能(代码片段)
前言本节内容我们主要介绍一下如何在springboot项目中集成fastdfs组件,实现文件的上传与下载。关于fastdfs服务中间键的安装过程,本节内容不做介绍。fastdfs是一个轻量级的分布式文件系统,也是我们文件存储中常常... 查看详情
atp应用测试平台——关于axios的配置使用(代码片段)
前言该篇是应某个粉丝的要求讲解一下关于axios请求组件的使用,其实axios组件类似我们以前使用过的jQuery中的ajax组件,都是用于进行http网络请求的组件。axios组件在前后端分离项目中使用的更加广泛,也更易集成... 查看详情
atp应用测试平台——关于网页表格的打印及pdf下载的实战案例(代码片段)
前言在网站应用中,我们可能会有这样一个需求,将网页的部分内容,例如表格,网页片段下载打印或者导出PDF,本小节内容正是关于这样一个内容的实战,基于vue2环境开发,希望能够帮助到你。源码... 查看详情
atp应用测试平台——关于vue-router前端路由的配置使用案例(代码片段)
前言VueRouter是Vue.js(opensnewwindow)官方的路由管理器。它和Vue.js的核心功能深度集成,功能丰富,是我们构建vue前端项目中必不可少的的组件之一,本节我们简单介绍一下在vue项目中如何使用vue-router组件实现页面的路由... 查看详情
atp应用测试平台——关于vue中vue-quill-editormavon-editortinymce等多种富文本编辑器的集成使用(代码片段)
...or以及tinymce等的集成及使用。源代码依然托管在我们的ATP应用测试平台中,源码地址:https://gitee.com/northcangap/atp,仅供参考使用。富文本编辑器实现效果如下: 查看详情
atp应用测试平台——使用easyexcel实现excel导入导出多sheet填充模板下载等功能案例实战(代码片段)
前言Java开发中实现Excel的导入、导出、填充、多sheet页操作等常用功能也是我们经常要面对的开发需求,本文以easyexcel为例,将excel中的常用功能整理成一个个小案例,参考使用。案例源码地址:https://gitee.com/northc... 查看详情
(二十)atp应用测试平台——websocket实现微服务版在线客服聊天室实战案例(代码片段)
前言在前面的博客内容中我们介绍了如何使用websocket实现一个网页版的在线客服聊天室,众所周知,由于websocket是一个长连接,要和服务端保持会话连接,所以其本身并不适用于微服务环境,在微服务环境中... 查看详情
(十七)atp应用测试平台——redis实现api接口访问限流(固定窗口限流算法)(代码片段)
前言开始正文之前,大多数情况下应该有这样一段场景。面试官:说说平常在项目中,你是如何使用redis的?我:我们就很简单啦,比如前后端分离token的存储、短信验证码的存储,权限列表的存储... 查看详情
(二十一)atp应用测试平台——vue实战之大红灯笼高高挂(代码片段)
前言2022年的最后一天班,好的开始,好的结束。把大红灯笼高高挂起来,欢度元旦的到来,兔年的到来。明年再战。明年再见。。。 正文正菜奉上lantern.vue灯笼源码<template><divclass="app"><divclass... 查看详情
(二十一)atp应用测试平台——vue实战之大红灯笼高高挂(代码片段)
前言2022年的最后一天班,好的开始,好的结束。把大红灯笼高高挂起来,欢度元旦的到来,兔年的到来。明年再战。明年再见。。。 正文正菜奉上lantern.vue灯笼源码<template><divclass="app"><divclass... 查看详情
atp应用测试平台——使用vue-video-player视频播放组件实现网页视频流的播放案例实战(代码片段)
前言在网页中播放视频也是我们经常要使用到的功能,例如设备监控的视频流实时播放,MP4、m3u8等视频资源播放等等,在vue项目中,我们可以使用目前封装好的开源组件vue-video-player实现上述的要求。本节我们就... 查看详情
springboot(十九):使用springbootactuator监控应用
springBoot(十九):使用SpringBootActuator监控应用微服务的特点决定了功能模块的部署是分布式的,大部分功能模块都是运行在不同的机器上,彼此通过服务调用进行交互,前后台的业务流会经过很多个微服务的处理和传递,出现了异... 查看详情