深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

洛神灬殇 洛神灬殇     2023-01-25     276

关键词:

前言介绍

RocketMQ目前在国内应该是比较流行的MQ 了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ 发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问题有一定的帮助。

技术范围

分析的总体技术范围发送到存储,本文的主要目的是主要是为了认识一条消息并分析被发出且被存储的,代码中,关于 MQ 文件系统的优化,设计等。

现在出发

来自官方源码example的一段发送代码:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();

send发送的分析

直接看看send方法,send 方法会设置一个默认的 timeout:3秒。默认使用 SYNC 模式,另外有Async和OneWay模式。需要处理方法签名中的 Client 端的异常,网络异常,Broker 端的异常,线程中断异常。

sendDefaultImpl核心实现类

DefaultMQProducerImpl 的 sendDefaultImpl方法就是发送的主要逻辑。

代码里,有个地方可以提一下,关于更新故障时间的策略,RocketMQ有一个类 MQFaultStrategy,用来处理MQ错误,然后对 MQ Server 进行服务降级。

服务降级策略

如果发送一条消息在550ms以内,那么就不用降级,如果550毫秒以外,就进行容错降级(熔断)30 秒,以此类推。

sendKernelImpl核心方法

再看DefaultMQProducerImpl 的 sendKernelImpl发送到内核的方法实现。

先找到broker的地址。尝试压缩大于4M 的消息(批量消息不压缩),然后执行各种钩子。

  • Request对象(存放数据)
  • Context 上下文对象(存放调用上下文)。

这里会设置一个消息生成时间,即bornTimestamp,后面使用消息轨迹的时候,可以查看。

同步模式的核心处理

默认情况下:如果采用SYNC 模式,就调用 MQClientAPIImpl 来发送消息,这一层还是在 Client 模块里,在这一层,会设置更详细的消息细节,构造命令对象。最后调用 remotingClient的 invokeSync 发送消息。

MQClientAPIImpl的sendMessage

MQClientAPIImpl的sendMessage这一层,会给命令对象设置一个CmdCode,叫SEND_MESSAGE,这个东西就是一个和Broker的契约,Broker会根据这个Code进行不同的策略。

RPC的实现方式
  1. 如果这里用RPC的方式,例如,使用一个接口的抽象方法。
  2. 然后,Broker对抽象方法进行 RPC 调用,这样可不可以呢?
  3. 最后,看看 remotingClient的invokeSync是如何实现的。

Remoting模块发送消息实现

invokeSync方法

  1. 首先,执行 RPCBefore 钩子,类似Spring的各种Bean扩展组件
  2. 然后,就是对超时进行判断。
  3. 最后,几乎每个方法都有对超时的判断,超时判断和超时处理在分布式场景非常重要。
  4. 根据addr找到对应的Socket Channel
  5. 然后执行invokeSyncImpl方法。
  6. 这里其实和其他大部分的RPC框架都是类似的了,生产一个永远自增的Request ID,创建一个Feature对象和这个ID绑定,方便Netty返回数据对这个ID对应的线程进行唤醒。
  7. 然后调用Netty的writeAndFlush方法,将数据写进Socket,同时添加一个监听器,如果发送失败,唤醒当前线程。
  8. 发送完毕之后,当前线程进行等待,使用CountDownLatch.wait方法实现,当Netty返回数据时,使用CountDownLatch.countDown进行唤醒
  9. 然后返回从 Broker 写入的结果,可能成功,也可能失败,需要到上层(Client 层)解析,网络层只负责网络的事情。

Netty 会使用 Handler 处理出去的数据和返回的数据,我们看看 Client 端 Netty 有哪些 Handler.

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() 
                @Override
                public void initChannel(SocketChannel ch) throws Exception 
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) 
                        if (null != sslContext) 
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                         else 
                            log.warn("Connections are insecure as SSLContext is null!");
                        
                    
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                
            );

使用了一个 Encoder,Decoder,空闲处理器,连接管理器,ClientHandler。

XXCoder就是对Cmd对象进行序列化和反序列化的,这里的空闲使用的读写最大空闲时间为120s,超过这个,就会触发空闲事件。

连接管理器
  • RocketMQ就会关闭Channel 连接。而针对空闲事件进行处理的就是连接管理器了。

  • 连接管理器处理空闲、Close、Connect、异常等事件,使用监听器模式,不同的监听器对不同的事件进行处理。另外,这里也许可以借鉴 EventBus,每个事件可以设置多个监听器。

如何处理返回值

看了RocketMQ中 Netty 的设计,再看看返回值处理就简单了,NettyClientHandler 会在 channelRead0 方法处理 Netty Server 的返回值。对应 RMQ,则是 processMessageReceived 方法。该方法很简洁:

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception 
        final RemotingCommand cmd = msg;
        if (cmd != null) 
            switch (cmd.getType()) 
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            
        
    

其实,这是一个模板方法,固定算法,由子类实现,分为 Request 实现和 Response 实现。我们看看 Response 实现。

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) 
        final int opaque = cmd.getOpaque();
        // 找到 Response .
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) 
            responseFuture.setResponseCommand(cmd);
            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) 
                executeInvokeCallback(responseFuture);
             else // 返回结果
                responseFuture.putResponse(cmd);
                responseFuture.release();
            
         else 
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        
    

通过 cmd 对象的 Request ID 找到 Feature,执行 responseFuture.putResponse,设置返回值,唤醒阻塞等待的发送线程。

这里还有一个 release 调用,这个和异步发送有关,默认最大同时 65535 个异步请求,具体就不展开了。

到这里,唤醒阻塞的发送线程,返回数据,客户端层面的发送就结束了。

Broker端如何处理消息

看源码,看到有个 SEND_MESSAGE Code,是 Client 和 Broker Server 的一个约定代码,我们看看这个代码在哪里用的。

在 broker 模块的 BrokerController 类中,有个 registerProcessor 方法,会将 SEND_MESSAGE Code 和一个 SendMessageProcessor 对象绑定。

NettyServerHandler

NettyRemotingServer是处理Request 的类,ServerBootstrap 会在 pipeline 中添加一个 NettyServerHandler处理器,这个处理器的channelRead0方法会调用 NettyRemotingServer的父类processMessageReceived 方法。

processMessageReceived

从processorTable 里,根据 Cmd Code,也就是 SEND_MESSAGE 获取对应的 Processor

Processor 由 2 部分组成,

一部分是处理数据的对象,一部分是这个对象所对应的线程池。用于异步处理逻辑,防止阻塞 Netty IO线程。

doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);// 处理.
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

前后都是执行一些钩子,例如 ACL

RocketMQ会有一个 BrokerController 类,会注册 Code 和 Processor 的绑定关系,BrokerController 也会把这些绑定,注册到 Netty Server 中,当 Netty Server 从 Socket 收到 Cmd 对象,根据 Cmd 对象的 Code,就可以找到对应 Processor 类,对数据进行处理。

中间是处理 Request请求的。这个 processRequest 方法,有很多的实现,SendMessageProcessor的sendMessage 是处理消息的主要逻辑。

消息存储引擎,这里我们看DefaultMessageStore的putMessage 实现。

putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

由于RocketMQ写数据是PageCache里面写的,因此,如果写的慢,就是 PageCache 忙,这里忙的标准是,如果锁文件的时间,超过了 1 秒,那就是忙。

if (this.isOSPageCacheBusy()) // 检查 mmp 忙不忙.
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);

最后调用 PutMessageResult result = this.commitLog.putMessage(msg) 写数据。如果耗时超过 500 毫秒,就会打印日志。这样我们排查问题的时候,可以看看 storeStats 的日志。

commitLog 的 putMessage 方法
  1. 先拿到最新的MappedFile 文件,MappedFile 文件的命名是用 offset 命名的,一个文件默认 1gb,这个大小和 mmp 的机制有关,通常不能过大。

  2. 然后上锁,这段代码是可以说整个 RocketMQ Server 的热点区域,

  3. 这里上锁会记录上锁的时间,方便前面做 PageCache Busy 的判断。

写入代码
result = mappedFile.appendMessage(msg, this.appendMessageCallback)

写完之后,释放锁,如果超过 500 毫秒,打印 cost time 日志。

统计

处理刷盘和slave 同步,这里看刷盘策略和同步策略,是 SYNC 还是 ASYNC。经过我的测试,同步刷盘和异步刷盘的性能差距是 10 倍。

而 Slave 的数据同步,如果用 SYNC 模式,tps 最高也就 2000 多一丢度,为什么?内网,两台机器 ping 一下都要 0.2 毫秒,一秒最多 5000 次,再加上处理逻辑, 2000 已经到顶了,网络成了瓶颈。

我们看看 mappedFile.appendMessage 方法的实现。一路追踪,有个关键逻辑, 在 appendMessagesInner 里:

int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) 
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result = null;
    if (messageExt instanceof MessageExtBrokerInner) 
        // 写数据到 缓存
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
     else if (messageExt instanceof MessageExtBatch) 
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
     else 
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    
    this.wrotePosition.addAndGet(result.getWroteBytes());
    this.storeTimestamp = result.getStoreTimestamp();
    return result;

代码中,使用了 mappedFile 从 Linux 映射的 MMap buffer,对数据进行写入。我们看看 doAppend 方法。

总长度、魔数、CRC 校验、队列 ID、各种 flag、存储时间,物理 offset、存储 IP、时间戳、扩展属性等等。最终,这条消息会被写入到 MMap 中。

那什么时候刷盘
  • 如果是 SYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,就会立刻刷盘并等待刷盘结果。
  • 如果是 ASYNC 模式,执行 CommitLog 的 handleDiskFlush 的方法时,会通知异步线程进行刷盘,但不等待结果。

如果没有新数据,则为 500ms 执行一次刷盘策略。

简单说下异步刷盘:

默认刷盘 4 页,Linux 一页是 4kb 数据,4页就是 16kb。

如果写的数据减去已经刷的数据,剩下的数据大于等于 4 页,就执行刷盘,执行 mappedByteBuffer.force() 或者 fileChannel.force(false);

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的存储系统的实现原理和持久化机制

RocketMQ的发展历史RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache软件基金会捐赠RocketMQ,成为Apache孵化项目。2017年9月25日,A... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)

精华推荐|【深入浅出RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析下篇:分析基于上篇的总体流程的... 查看详情

精华推荐|深入浅出rocketmq原理及实战「性能原理挖掘系列」透彻剖析贯穿rocketmq的系统服务底层原理以及高性能存储设计挖掘深入(代码片段)

设计背景消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,采用文件系统作为消息存储的方式。RocketMQ存储机制消息中间件的存... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的broker服务端自动创建topic的原理分析和问题要点指南

前提背景使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环... 查看详情

深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

前言介绍RocketMQ目前在国内应该是比较流行的MQ了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的broker服务端自动创建topic的原理分析和问题要点指南(代码片段)

前提背景使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的... 查看详情

深度挖掘rocketmq底层源码「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)(代码片段)

精华推荐|【深入浅出RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析下篇:分析基于上篇的总体... 查看详情

深入浅出spring原理及实战「源码原理实战」从底层角度去分析研究propertysourcesplaceholderconfigurer的原理及实战注入机制

Spring提供配置解析功能主要有一下xml文件占位符解析和Java的属性@Value的占位符解析配置这两种场景进行分析和实现解析,如下面两种案例。xml文件的占位符解析配置<beanid="dataSource"class="com.alibaba.druid.pool.DruidDataSource"init-method="i... 查看详情

深度挖掘rocketmq底层源码「底层系列」深度挖掘rocketmq底层导致消息丢失透析(brokerbusy和tomanyrequest)(代码片段)

承接上文通过上一篇文章《【深度挖掘RocketMQ底层源码】「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]》,我们知道了对应的“[REJECTREQUEST]systembusy,startflowcontrolforawhile”的主要... 查看详情

vue开发实战生态篇#20:选择何种模式的路由及底层原理

说明【Vue开发实战】学习笔记。路由类型Hash模式:丑,无法使用锚点定位History模式:需要后端配合,IE9不兼容(可使用强制刷新处理)底层原理原理图如下 查看详情

深入浅出dubbo3原理及实战「技术大纲」深入浅出并发实战课程系列及技术指南

Dubbo3开题简介如开篇所述,Dubbo提供了构建云原生微服务业务的一站式解决方案,可以使用Dubbo快速定义并发布微服务组件,同时基于Dubbo开箱即用的丰富特性及超强的扩展能力,构建运维整个微服务体系所需的各... 查看详情

深入浅出学习透析nginx服务器的架构分析及原理分析「底层技术原理+运作架构机制」

Nginx再次回顾也许你已经忘记了Nginx是做什么的?我来再次给你夯实一下概念。多协议反向代理Nginx是个高性能的Web和反向代理服务器及HTTP服务器,它能反向代理HTTP,HTTPS和邮件相关(SMTP,POP3,IMAP)的协议链接,还可以提供了负载... 查看详情

深入浅出sentinel原理及实战「基础实战专题」零基础实现服务流量控制实战开发指南

你若要喜爱你自己的价值,你就得给世界创造价值。Sentinel的组成部分Sentinel主要由以下两个部分组成。Sentinel核心库(Java客户端):Sentinel的核心库不依赖任何框架或库,能够运行于Java8及以上的版本的运行时环境中,同时对Spri... 查看详情

深入浅出学习透析nginx服务器的架构分析及原理分析「底层技术原理+运作架构机制」

Nginx再次回顾也许你已经忘记了Nginx是做什么的?我来再次给你夯实一下概念。多协议反向代理Nginx是个高性能的Web和反向代理服务器及HTTP服务器,它能反向代理HTTP,HTTPS和邮件相关(SMTP,POP3,IMAP)的协议链接&#... 查看详情

深入理解java中的底层阻塞原理及实现

谈到阻塞,相信大家都不会陌生了。阻塞的应用场景真的多得不要不要的,比如生产-消费模式,限流统计等等。什么ArrayBlockingQueue、LinkedBlockingQueue、DelayQueue等等,都是阻塞队列的实现啊,多简单!阻塞,一般有两个特性很亮眼... 查看详情

深入浅出springcloud原理及实战「netflix系列之hystrix」针对于限流熔断组件hystrix的基本参数和实现原理介绍分析(代码片段)

...此度过糟糕的一生。[温馨提示]承接第一篇文章🏹【深入浅出SpringCloud原理及实战】「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析在这里推荐给大家martinfowler的熔断器介绍和权威指南,有兴趣... 查看详情

深入浅出springcloud原理及实战「netflix系列之hystrix」针对于限流熔断组件hystrix的超时机制的原理和实现分析(代码片段)

...此度过糟糕的一生。[温馨提示]承接第一篇文章🏹【深入浅出SpringCloud原理及实战】「Netflix系列之Hystrix」针对于限流熔断组件Hystrix的基本参数和实现原理介绍分析在这里推荐给大家martinfowler的熔断器介绍和权威指南,有兴趣... 查看详情

深入浅出sentinel原理及实战「基础实战专题」零基础探索分析sentinel控制台开发指南

Sentinel控制台Sentinel提供了一个轻量级的开源控制台SentinelDashboard,它提供了机器发现与健康情况管理、监控(单机和集群)、规则管理与推送等多种功能。Sentinel控制台提供的功能如下查看机器列表以及健康情况:Sentnel控制台能... 查看详情