rocketmq:消息存储机制详解与源码解析(代码片段)

又蠢又笨的懒羊羊程序猿 又蠢又笨的懒羊羊程序猿     2023-02-04     448

关键词:

消息存储机制

1.前言

本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。

2.核心存储类:DefaultMessageStore

private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类->消息存储在commitLog中
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘服务线程
private final CleanCommitLogService cleanCommitLogService;	//过期CommitLog文件删除服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//过期ConsumerQueue队列文件删除服务
private final IndexService indexService;	//索引服务
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务->内存映射处理commitLog、consumerQueue文件
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//消息主从同步实现服务
private final ScheduleMessageService scheduleMessageService;	//消息服务调度服务
private final StoreStatsService storeStatsService;	//消息存储服务
private final MessageArrivingListener messageArrivingListener;  //消息到达监听器
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求

以上属性是消息存储的核心,需要重点关注每个属性的具体作用。

3.消息存储流程

消息存储时序图如下:

消息存储入口:DefaultMessageStore#putMessage

//检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续 
PutMessageStatus checkStoreStatus = this.checkStoreStatus();		
if (checkStoreStatus != PutMessageStatus.PUT_OK) 
    return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));


//检查消息主题和消息体长度是否合法
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) 
    return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));

//记录开始写入时间
long beginTime = this.getSystemClock().now();
//写入消息
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

resultFuture.thenAccept((result) -> 
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) 
        log.warn("not in lock elapsed time(ms)=, bodyLength=", elapsedTime, messageExtBatch.getBody().length);
    
	//记录相关统计信息
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
	//存储失败
    if (null == result || !result.isOk()) 
        //存储状态服务->消息存储失败次数自增
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    
);

return resultFuture;

DefaultMessageStore#checkStoreStatus

//存储服务已停止
if (this.shutdown) 
    log.warn("message store has shutdown, so putMessage is forbidden");
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;

//Broker为Slave->不可写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) 
    long value = this.printTimes.getAndIncrement();
    if ((value % 50000) == 0) 
        log.warn("broke role is slave, so putMessage is forbidden");
    
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;


//不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误
if (!this.runningFlags.isWriteable()) 
    long value = this.printTimes.getAndIncrement();
    if ((value % 50000) == 0) 
        log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
            "the broker's disk is full, write to logic queue error, write to index file error, etc");
    
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;
 else 
    this.printTimes.set(0);

//操作系统页写入是否繁忙
if (this.isOSPageCacheBusy()) 
    return PutMessageStatus.OS_PAGECACHE_BUSY;

return PutMessageStatus.PUT_OK;

CommitLog#asyncPutMessages

//记录消息存储时间
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

//消息类型是否合法
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) 
    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));


//....

//获取上一个MapperFile对象->内存映射的具体实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

//追加消息需要加锁->串行化处理
putMessageLock.lock();
try 
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    this.beginTimeInLock = beginLockTimestamp;

    //记录消息存储时间->保证消息的有序性
    messageExtBatch.setStoreTimestamp(beginLockTimestamp);

    //判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
    if (null == mappedFile || mappedFile.isFull()) 
        mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    
    //如果创建失败,直接返回
    if (null == mappedFile) 
        log.error("Create mapped file1 error, topic:  clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
        beginTimeInLock = 0;
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
    

    //!!!写入消息到mappedFile中!!!
    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
    //根据写入结果做不同的处理
    switch (result.getStatus()) 
        case PUT_OK:
            break;
        case END_OF_FILE:
            unlockMappedFile = mappedFile;
            // Create a new file, re-write the message
            mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            if (null == mappedFile) 
                // XXX: warn and notify me
                log.error("Create mapped file2 error, topic:  clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
            
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
            break;
        case MESSAGE_SIZE_EXCEEDED:
        case PROPERTIES_SIZE_EXCEEDED:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
        case UNKNOWN_ERROR:
        default:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    

    elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    beginTimeInLock = 0;
 finally 
    putMessageLock.unlock();


if (elapsedTimeInLock > 500) 
    log.warn("[NOTIFYME]putMessages in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, messageExtBatch.getBody().length, result);


if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);


PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());

//根据刷盘策略进行刷盘
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
//主从同步
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

MappedFile#appendMessagesInner

assert messageExt != null;
assert cb != null;

//获取写指针/写入位置
int currentPos = this.wrotePosition.get();

//写指针偏移量小于文件指定大小
if (currentPos < this.fileSize) 
    //写入缓冲区
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result;
    //根据消息类型->批量/单个->进行不同处理
    if (messageExt instanceof MessageExtBrokerInner) 
        //单个消息
        //调用回调方法写入磁盘->CommitLog#doAppend
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBrokerInner) messageExt, putMessageContext);
     else if (messageExt instanceof MessageExtBatch) 
        //批量消息
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBatch) messageExt, putMessageContext);
     else 
        //未知消息->返回异常结果
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    
    //更新写指针
    this.wrotePosition.addAndGet(result.getWroteBytes());
    //更新写入时间戳
    this.storeTimestamp = result.getStoreTimestamp();
    //返回写入结果->成功
    return result;

log.error("MappedFile.appendMessage return null, wrotePosition:  fileSize: ", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

CommitLog#doAppend

public AppendMessageResult doAppend(final long fileFromOffset,			//文件序列偏移量
                                    final ByteBuffer byteBuffer, 		//NIO字节容器
                                    final int maxBlank,				   //最大可写入字节数	
                                    final MessageExtBrokerInner msgInner, //消息封装实体
                                    PutMessageContext putMessageContext) 
    //文件写入偏移量
    long wroteOffset = fileFromOffset + byteBuffer.position();

    //构建msgId
    Supplier<String> msgIdSupplier = () -> 
        //系统标识
        int sysflag = msgInner.getSysFlag();
        //msgId底层存储由16个字节组成
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        //分配16个字节的存储空间
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        //8个字节->ip、host各占用4个字节
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        //清除缓冲区->因为接下来需要翻转缓冲区
        msgIdBuffer.clear();
        //剩下的8个字节用来存储commitLog偏移量-wroteOffset
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    ;

    //获取当前主题消息队列唯一key
    String key = putMessageContext.getTopicQueueTableKey();
    //根据key获取消息存储偏移量
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) 
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) 
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    

    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    //计算消息存储长度
    final int msgLen = preEncodeBuffer.getInt(0);

    // Determines whether there is sufficient free space
    //消息是如果没有足够的存储空间则新创建CommitLog文件
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) 
        this.msgStoreItemMemory.clear();
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0查看详情  

2rocketmq源码解析之与springboot集成(代码片段)

上一篇文章分析了一下RocketMQ的基本架构、概念、安装以及使用方式。现在大多数项目都是基于SpringBoot。因为它很方便的自动装配机制,所以现在构建项目都是基于SpringBoot。下面我们来分析一下RocketMQ是如何集成SpringBoot的。... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

rocketmq源码解析-broker与namesrv以及consumer交互(代码片段)

​这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。​我们知道broker实例时需要注册到namesrv... 查看详情

3rocketmq源码解析之源代码环境搭建(代码片段)

在之前的文章中我们知道了RocketMQ里面的核心功能、架构和概念。并且也介绍了它的简单使用与SpringBoot的集成。下面开始我们对RocketMQ的源码探索,首先我们先在自己本地搭建RocketMQ的源代码环境。1、下载源代码首先我们可... 查看详情

rocketmq详解(代码片段)

RocketMQ详解1.基础概念2.RocketMQ消费模式2.1广播模式2.2集群模式3.基础架构3.1Broker的存储结构3.2存储文件简介3.3Consumer端的负载均衡机制3.4消息刷盘机制3.5Mmap+pageCache3.5.1传统缓存IO和Mmap3.5.2pageCache3.5.3预映射机制+文件预热机制3... 查看详情

rocketmq源码(12)—broker的消息刷盘源码深度解析一万字(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了Broker的消息刷盘源码解析,以及高性能的刷盘机制。学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异步:同步刷盘:如上图所示,只有在消息真正... 查看详情

rocketmq消息队列——消息存储详解(代码片段)

2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情

rocketmq消息队列——消息存储详解(代码片段)

2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情

rocketmq源码解析-消息消费(代码片段)

RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情

rocketmq源码解析-store篇(代码片段)

这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块​我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

深入剖析rocketmq源码-负载均衡机制(代码片段)

目录一、引言二、RocketMQ的整体架构三、producer消息生产过程3.1路由同步过程3.2 负载均衡过程四、consumer消息消费过程4.1路由同步过程4.2 负载均衡过程五、RocketMQ指定机器消费设计思路六、小结一、引言RocketMQ是一款优秀的分布... 查看详情

rocket详细教程(代码片段)

目录1、RocketMQ整体介绍2、核心概念模型3、RocketMQ-源码包下载与结构说明4、RocketMQ-环境搭建(搭建一个实例)   4.1、Hosts添加信息   4.2、上传解压   4.3、创建存储路径   4.4、编辑RocketMQ配置文件   4.5、修改日志... 查看详情

rocketmq源码解析-store篇(代码片段)

这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块​我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情

rocketmq源码(13)—broker消息重放服务reputmessageservice源码解析(代码片段)

基于RocketMQrelease-4.9.3,深入的介绍了Broker消息重放服务ReputMessageService源码。此前我们学习了RocketMQ源码(12)—Broker消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】,这篇文将讲的是... 查看详情

8rocketmq源码解析之消息发送(代码片段)

...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情

8rocketmq源码解析之消息发送(代码片段)

...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情