rocketmq-spring:实战与源码解析一网打尽(代码片段)

勇哥java实战分享 勇哥java实战分享     2023-04-02     366

关键词:

RocketMQ 是大家耳熟能详的消息队列,开源项目 rocketmq-spring 可以帮助开发者在 Spring Boot 项目中快速整合 RocketMQ。

这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑

1 SDK 简介


项目地址:

https://github.com/apache/rocketmq-spring

rocketmq-spring 的本质是一个 Spring Boot starter

Spring Boot 基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。

Spring Boot starter 构造的启动器使用起来非常方便,开发者只需要在 pom.xml 引入 starter 的依赖定义,在配置文件中编写约定的配置即可。

下面我们看下 rocketmq-spring-boot-starter 的配置:

1、引入依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>

2、约定配置

接下来,我们分别按照生产者和消费者的顺序,详细的讲解消息收发的操作过程。

2 生产者

首先我们添加依赖后,进行如下三个步骤:

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生产者配置非常简单,主要配置名字服务地址生产者组

2、需要发送消息的类中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("$rocketmq.topic")
private String smsTopic;

3、发送消息,消息体可以是自定义对象,也可以是 Message 对象

rocketMQTemplate 类包含多钟发送消息的方法:

  1. 同步发送 syncSend
  2. 异步发送 asyncSend
  3. 顺序发送 syncSendOrderly
  4. oneway发送 sendOneWay

下面的代码展示如何同步发送消息。

String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult =
         rocketMQTemplate.syncSend(
            destination, 
            MessageBuilder.withPayload(messageContent).
            setHeader(MessageConst.PROPERTY_KEYS, uniqueId).
            build()
          );
if (sendResult != null) 
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) 
       // send message success ,do something 
    

syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags

第二个参数是:spring-message 规范的 message 对象 ,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。

3 消费者

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、实现消息监听器

@Component
@RocketMQMessageListener(
    consumerGroup = "$rocketmq.consumer1.group",  //消费组
    topic = "$rocketmq.consumer1.topic"  					//主题
)
public class SmsMessageCommonConsumer implements RocketMQListener<String> 
    public void onMessage(String message) 
       System.out.println("普通短信:" + message);
    

消费者实现类也可以实现 RocketMQListener<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据

public void onMessage(MessageExt message) 
    try 
        String body = new String(message.getBody(), "UTF-8");
        logger.info("普通短信:" + message);
     catch (Exception e) 
        logger.error("common onMessage error:", e);
    

4 源码概览

最新源码中,我们可以看到源码中包含四个模块:

1、rocketmq-spring-boot-parent

该模块是父模块,定义项目所有依赖的 jar 包。

2、rocketmq-spring-boot

核心模块,实现了 starter 的核心逻辑。

3、rocketmq-spring-boot-starter

SDK 模块,简单封装,外部项目引用。

4、rocketmq-spring-boot-samples

示例代码模块。这个模块非常重要,当用户使用 SDK 时,可以参考示例快速开发。

5 starter 实现

我们重点分析下 rocketmq-spring-boot 模块的核心源码:


spring-boot-starter 实现需要包含如下三个部分:

1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;

2、定义spring.factories 文件

在 resources 包下创建 META-INF 目录后,新建 spring.factories 文件,并在文件中定义自动加载类,文件内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。

3、实现自动加载类

在 RocketMQAutoConfiguration 类的具体实现中,我们重点分析下生产者和消费者是如何分别启动的。

▍生产者发送模板类:RocketMQTemplate

RocketMQAutoConfiguration 类定义了两个默认的 Bean :

首先SpringBoot项目中配置文件中的配置值会根据属性条件绑定到 RocketMQProperties 对象 中,然后使用 RocketMQ 的原生 API 分别创建生产者 Bean 和拉取消费者 Bean , 分别将两个 bean 设置到 RocketMQTemplate 对象中。

两个重点需要强调:

  • 发送消息时,将 spring-message 规范下的消息对象封装成 RocketMQ 消息对象

  • 默认拉取消费者 litePullConsumer 。拉取消费者一般用于大数据批量处理场景 。

​ RocketMQTemplate 类封装了拉取消费者的receive方法,以方便开发者使用。

▍自定义消费者类

下图是并发消费者的例子:

那么 rocketmq-spring 是如何自动启动消费者呢 ?

spring 容器首先注册了消息监听器后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。

对比并发消费者的例子,我们可以看到: DefaultRocketMQListenerContainer 是对 DefaultMQPushConsumer 消费逻辑的封装。

封装消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。

首先DefaultRocketMQListenerContainer初始化之后, 获取 onMessage 方法的参数类型 。

然后消费者调用 consumeMessage 处理消息时,封装了一个 handleMessage 方法 ,将原生 RocketMQ 消息对象 MessageExt 转换成 onMessage 方法定义的参数对象,然后调用 rocketMQListener 的 onMessage 方法。

上图右侧标红的代码也就是该方法的精髓:

rocketMQListener.onMessage(doConvertMessage(messageExt));

6 写到最后

开源项目 rocketmq-spring 有很多值得学习的地方 ,我们可以从如下四个层面逐层进阶:

1、学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;

2、模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);

3、starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;

4、举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

欢迎购买《kafka源码解析与实战》

沉静了大半年终于把Kafka相关的知识整理了下,欢迎大家购买。购买方式:登陆京东,搜索关键字:Kafka源码解析与实战。或者点击此链接https://item.jd.com/12277714.html 查看详情

kafka源码解析与实战

Kafka的架构包括Kafka的基本组成,Kafka的拓扑结构以及Kafka的内部通信协议。Kafka内部的通信协议是建立在Kafka的拓扑结构之上,而Kafka的拓扑结构是由Kafka的基本模块所组成的。AKRELEASE2.5.0APRIL15,2020Kafka的基本组成Kafka集群中生产者... 查看详情

分享《tensorflow技术解析与实战》+pdf+源码+李嘉璇

...资料分享:http://blog.51cto.com/14087171《TensorFlow技术解析与实战》高清中文PDF+源代码高清中文PDF,311页,带目录和书签,文字可以复制粘贴,彩色配图。配套源代码。经典书籍。本书从深度学习的基础讲起,深入TensorFlow框架原理、... 查看详情

elasticsearch源码解析与优化实战(张超)----笔记

作者是基于6.1.2,本人在他的基础上梳理ES7.9.3,可能部分章节未更新到,以及有问题的地方,请指正,谢谢!  第1章走进Elasticsearch  第2章准备编译和调试环境  第3章:集群启动流程  第4章&#... 查看详情

《elasticsearch源码解析与优化实战》样章-第6章数据模型

1.概述转载:https://www.easyice.cn/archives/300这里面有一些是我以前不知道的,先记录一下,以后看的时候加深理解。 查看详情

protobuf源码解析与netty+rpc实战(代码片段)

1.背景grpc由protobuf+netty实现,为了研究grpc框架的设计思想,本文从protobuf生成的java源码出发,脱离grpc框架研究protobuf的框架。在此基础上,基于protobuf+netty手写一个rpc框架,熟悉rpc的设计思路。2.逐步深入protobuf2.1protobuf工作流程p... 查看详情

《elasticsearch源码解析与优化实战》第9章:search流程(代码片段)

文章目录一、简介二、索引和搜索2.1、建立索引2.2、执行搜索2.3、searchtype三、分布式搜索过程3.1、协调节点流程3.1.1、Query阶段3.1.1.1、Query阶段源码分析3.1.2、Fetch阶段3.1.2.1、Fetch阶段源码解析3.2、执行搜索的数据节点流程3.2.1、... 查看详情

elasticsearch源码解析与优化实战(张超)----笔记

作者是基于6.1.2,本人在他的基础上梳理ES7.9.3,可能部分章节未更新到,以及有问题的地方,请指正,谢谢!  第1章走进Elasticsearch  第2章准备编译和调试环境  第3章:集群启动流程  第4章&#... 查看详情

elasticsearch源码解析与优化实战(张超)----笔记

作者是基于6.1.2,本人在他的基础上梳理ES7.9.3,可能部分章节未更新到,以及有问题的地方,请指正,谢谢!  第1章走进Elasticsearch  第2章准备编译和调试环境  第3章:集群启动流程  第4章&#... 查看详情

《elasticsearch源码解析与优化实战》第15章:transport模块分析(代码片段)

文章目录简介配置信息传输模块配置通用网络配置Transport总体架构网络层1.网络模块初始化2.Netty4Transport3.Netty4HttpServerTransport服务层1.连接到节点2.发送请求3.定义对Response的处理4.定义对请求的处理REST解析和处理RPC实现RPC的注册和... 查看详情

《elasticsearch源码解析与优化实战》第15章:transport模块分析(代码片段)

文章目录简介配置信息传输模块配置通用网络配置Transport总体架构网络层1.网络模块初始化2.Netty4Transport3.Netty4HttpServerTransport服务层1.连接到节点2.发送请求3.定义对Response的处理4.定义对请求的处理REST解析和处理RPC实现RPC的注册和... 查看详情

java并发编程高阶技术高性能并发框架源码解析与实战

第1章课程介绍(Java并发编程进阶课程)什么是Disruptor?它一个高性能的异步处理框架,号称“单线程每秒可处理600W个订单”的神器,本课程目标:彻底精通一个如此优秀的开源框架,面试秒杀面试官。本章会带领小伙伴们先了... 查看详情

《elasticsearch源码解析与优化实战》第21章:综合应用实践(代码片段)

文章目录简介集群层规划集群规模单节点还是多节点部署移除节点独立部署主节点节点层控制线程池的队列大小为系统cache保留一半物理内存系统层关闭swap配置LinuxOOMKiller优化内核参数索引层使用全局模板索引轮转避免热索引分... 查看详情

《elasticsearch源码解析与优化实战》第21章:综合应用实践(代码片段)

文章目录简介集群层规划集群规模单节点还是多节点部署移除节点独立部署主节点节点层控制线程池的队列大小为系统cache保留一半物理内存系统层关闭swap配置LinuxOOMKiller优化内核参数索引层使用全局模板索引轮转避免热索引分... 查看详情

《elasticsearch源码解析与优化实战》第16章:threadpool模块分析(代码片段)

文章目录一、简介二、线程池类型2.1、fixed2.2、scaling2.3、direct2.4、fixed_auto_queue_size三、处理器设置四、查看线程池4.1、catthreadpool4.2、nodesinfo4.3、nodesstats4.4、nodeshotthreads4.5、Java的线程池结构五、ES的线程池实现5.1、ThreadPool类结构... 查看详情

《elasticsearch源码解析与优化实战》第16章:threadpool模块分析(代码片段)

文章目录一、简介二、线程池类型2.1、fixed2.2、scaling2.3、direct2.4、fixed_auto_queue_size三、处理器设置四、查看线程池4.1、catthreadpool4.2、nodesinfo4.3、nodesstats4.4、nodeshotthreads4.5、Java的线程池结构五、ES的线程池实现5.1、ThreadPool类结构... 查看详情

《elasticsearch源码解析与优化实战》第7章:写流程(代码片段)

文章目录一、简介1.1、文档操作的定义1.2、可选参数二、Index/Bulk基本流程三、Index/Bulk详细流程3.1、协调节点流程3.1.1、参数检查3.1.2、处理pipeline请求3.1.3、自动创建索引3.1.4、对请求的预先处理3.1.5、检测集群状态3.1.6、内容路... 查看详情

《elasticsearch源码解析与优化实战》第6章:数据模型(代码片段)

文章目录关注我的公众号【宝哥大数据】,更多干货一、简介1.1、查询类型二、PacificA算法2.1、数据副本策略2.2、配置管理2.3、错误检测三、数据副本模型3.1、基本写入模型3.2、写故障处理3.3、基本读取模型3.3.1、检索数据过... 查看详情