关键词:
文章目录
消息存储机制
1.前言
本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。
2.核心存储类:DefaultMessageStore
private final MessageStoreConfig messageStoreConfig; //消息配置属性
private final CommitLog commitLog; //CommitLog文件存储的实现类->消息存储在commitLog中
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; //消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService; //消息队列文件刷盘服务线程
private final CleanCommitLogService cleanCommitLogService; //过期CommitLog文件删除服务
private final CleanConsumeQueueService cleanConsumeQueueService; //过期ConsumerQueue队列文件删除服务
private final IndexService indexService; //索引服务
private final AllocateMappedFileService allocateMappedFileService; //MappedFile分配服务->内存映射处理commitLog、consumerQueue文件
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService; //消息主从同步实现服务
private final ScheduleMessageService scheduleMessageService; //消息服务调度服务
private final StoreStatsService storeStatsService; //消息存储服务
private final MessageArrivingListener messageArrivingListener; //消息到达监听器
private final TransientStorePool transientStorePool; //消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager; //Broker状态管理器
private final MessageArrivingListener messageArrivingListener; //消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig; //Broker配置类
private StoreCheckpoint storeCheckpoint; //文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList; //CommitLog文件转发请求
以上属性是消息存储的核心,需要重点关注每个属性的具体作用。
3.消息存储流程
消息存储时序图如下:
消息存储入口:DefaultMessageStore#putMessage
//检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK)
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
//检查消息主题和消息体长度是否合法
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL)
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
//记录开始写入时间
long beginTime = this.getSystemClock().now();
//写入消息
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
resultFuture.thenAccept((result) ->
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500)
log.warn("not in lock elapsed time(ms)=, bodyLength=", elapsedTime, messageExtBatch.getBody().length);
//记录相关统计信息
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
//存储失败
if (null == result || !result.isOk())
//存储状态服务->消息存储失败次数自增
this.storeStatsService.getPutMessageFailedTimes().add(1);
);
return resultFuture;
DefaultMessageStore#checkStoreStatus
//存储服务已停止
if (this.shutdown)
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
//Broker为Slave->不可写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole())
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0)
log.warn("broke role is slave, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
//不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误
if (!this.runningFlags.isWriteable())
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0)
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
else
this.printTimes.set(0);
//操作系统页写入是否繁忙
if (this.isOSPageCacheBusy())
return PutMessageStatus.OS_PAGECACHE_BUSY;
return PutMessageStatus.PUT_OK;
CommitLog#asyncPutMessages
//记录消息存储时间
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
//消息类型是否合法
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE)
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
//....
//获取上一个MapperFile对象->内存映射的具体实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//追加消息需要加锁->串行化处理
putMessageLock.lock();
try
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
//记录消息存储时间->保证消息的有序性
messageExtBatch.setStoreTimestamp(beginLockTimestamp);
//判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
if (null == mappedFile || mappedFile.isFull())
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
//如果创建失败,直接返回
if (null == mappedFile)
log.error("Create mapped file1 error, topic: clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
//!!!写入消息到mappedFile中!!!
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
//根据写入结果做不同的处理
switch (result.getStatus())
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile)
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
finally
putMessageLock.unlock();
if (elapsedTimeInLock > 500)
log.warn("[NOTIFYME]putMessages in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, messageExtBatch.getBody().length, result);
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable())
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
//根据刷盘策略进行刷盘
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
//主从同步
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
MappedFile#appendMessagesInner
assert messageExt != null;
assert cb != null;
//获取写指针/写入位置
int currentPos = this.wrotePosition.get();
//写指针偏移量小于文件指定大小
if (currentPos < this.fileSize)
//写入缓冲区
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
//根据消息类型->批量/单个->进行不同处理
if (messageExt instanceof MessageExtBrokerInner)
//单个消息
//调用回调方法写入磁盘->CommitLog#doAppend
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
else if (messageExt instanceof MessageExtBatch)
//批量消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
else
//未知消息->返回异常结果
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
//更新写指针
this.wrotePosition.addAndGet(result.getWroteBytes());
//更新写入时间戳
this.storeTimestamp = result.getStoreTimestamp();
//返回写入结果->成功
return result;
log.error("MappedFile.appendMessage return null, wrotePosition: fileSize: ", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
CommitLog#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, //文件序列偏移量
final ByteBuffer byteBuffer, //NIO字节容器
final int maxBlank, //最大可写入字节数
final MessageExtBrokerInner msgInner, //消息封装实体
PutMessageContext putMessageContext)
//文件写入偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();
//构建msgId
Supplier<String> msgIdSupplier = () ->
//系统标识
int sysflag = msgInner.getSysFlag();
//msgId底层存储由16个字节组成
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
//分配16个字节的存储空间
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
//8个字节->ip、host各占用4个字节
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
//清除缓冲区->因为接下来需要翻转缓冲区
msgIdBuffer.clear();
//剩下的8个字节用来存储commitLog偏移量-wroteOffset
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
;
//获取当前主题消息队列唯一key
String key = putMessageContext.getTopicQueueTableKey();
//根据key获取消息存储偏移量
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset)
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType)
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
//计算消息存储长度
final int msgLen = preEncodeBuffer.getInt(0);
// Determines whether there is sufficient free space
//消息是如果没有足够的存储空间则新创建CommitLog文件
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank)
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0查看详情
2rocketmq源码解析之与springboot集成(代码片段)
上一篇文章分析了一下RocketMQ的基本架构、概念、安装以及使用方式。现在大多数项目都是基于SpringBoot。因为它很方便的自动装配机制,所以现在构建项目都是基于SpringBoot。下面我们来分析一下RocketMQ是如何集成SpringBoot的。... 查看详情
rocketmq源码合集
消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下... 查看详情
rocketmq源码解析-broker与namesrv以及consumer交互(代码片段)
这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。我们知道broker实例时需要注册到namesrv... 查看详情
3rocketmq源码解析之源代码环境搭建(代码片段)
在之前的文章中我们知道了RocketMQ里面的核心功能、架构和概念。并且也介绍了它的简单使用与SpringBoot的集成。下面开始我们对RocketMQ的源码探索,首先我们先在自己本地搭建RocketMQ的源代码环境。1、下载源代码首先我们可... 查看详情
rocketmq详解(代码片段)
RocketMQ详解1.基础概念2.RocketMQ消费模式2.1广播模式2.2集群模式3.基础架构3.1Broker的存储结构3.2存储文件简介3.3Consumer端的负载均衡机制3.4消息刷盘机制3.5Mmap+pageCache3.5.1传统缓存IO和Mmap3.5.2pageCache3.5.3预映射机制+文件预热机制3... 查看详情
rocketmq源码(12)—broker的消息刷盘源码深度解析一万字(代码片段)
基于RocketMQrelease-4.9.3,深入的介绍了Broker的消息刷盘源码解析,以及高性能的刷盘机制。学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个,同步或者异步:同步刷盘:如上图所示,只有在消息真正... 查看详情
rocketmq消息队列——消息存储详解(代码片段)
2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情
rocketmq消息队列——消息存储详解(代码片段)
2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情
rocketmq源码解析-消息消费(代码片段)
RocketMQ源码解析-消息消费1.消费者相关类2.消费者的启动3.消息的拉取4.消费者的负载均衡5.消息的消费6.消费进度管理看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地... 查看详情
rocketmq源码解析-store篇(代码片段)
这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情
rocketmq源码合集
消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下... 查看详情
深入剖析rocketmq源码-负载均衡机制(代码片段)
目录一、引言二、RocketMQ的整体架构三、producer消息生产过程3.1路由同步过程3.2 负载均衡过程四、consumer消息消费过程4.1路由同步过程4.2 负载均衡过程五、RocketMQ指定机器消费设计思路六、小结一、引言RocketMQ是一款优秀的分布... 查看详情
rocket详细教程(代码片段)
目录1、RocketMQ整体介绍2、核心概念模型3、RocketMQ-源码包下载与结构说明4、RocketMQ-环境搭建(搭建一个实例) 4.1、Hosts添加信息 4.2、上传解压 4.3、创建存储路径 4.4、编辑RocketMQ配置文件 4.5、修改日志... 查看详情
rocketmq源码解析-store篇(代码片段)
这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueue、MappedFile、CommitLog、MessageStore、ConsumeQueue... 查看详情
rocketmq源码(13)—broker消息重放服务reputmessageservice源码解析(代码片段)
基于RocketMQrelease-4.9.3,深入的介绍了Broker消息重放服务ReputMessageService源码。此前我们学习了RocketMQ源码(12)—Broker消息刷盘服务GroupCommitService、FlushRealTimeService、CommitRealTimeService源码深度解析【一万字】,这篇文将讲的是... 查看详情
8rocketmq源码解析之消息发送(代码片段)
...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情
8rocketmq源码解析之消息发送(代码片段)
...ver以及消息管理Broker。下面我们就可以进行消息发送了。RocketMQ支持三种消息发送方式:同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果异步消息发送(async):当Producer发送消息到Broker时会指定... 查看详情