消息队列---rocketmq-消息存储1(代码片段)

benjious benjious     2023-05-06     477

关键词:

问题 ;

  • 部署时如何知道自己是 broker 还是 NameServer
  • topic 订阅信息放在哪里
  • broker 的作用到底是什么
  • 纪录是如何持久化的
  • 群发的时候,是如何储存消息的
  • send 方法到底有没有使用多线程发送处理,简单叙述一个 send 的过程
  • transientStorePool 作用到底是什么
    以上几个问题,是在学习中思考的问题,我们将在学习过程中逐渐寻找答案。

概述

本文将会走完文件存储的主体流程,后续分析各个重要的类。

源码分析

消息持久化几个重要的类 :

  • DefaultMessageStore
  • commitLog
  • MappedFile

DefaultMessageStore 持有 commitLog ,commitLog 持有 MappedFileQueue ,MappedFileQueue 管理着多个 MappedFile.

首先我们从 DefaultMessageStore 的 putMessage 方法

    /**
     * 1. 判断是否条件满足条件
     * 2. 调用 this.commitLog.putMessage(msg);
     *
     * @param msg Message instance to store
     * @return
     */
    public PutMessageResult putMessage(MessageExtBrokerInner msg) 
        if (this.shutdown) 
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        

        // 该 broker 是 slave 模式
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) 
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) 
                log.warn("message store is slave mode, so putMessage is forbidden ");
            

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        

        if (!this.runningFlags.isWriteable()) 
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) 
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
         else 
            this.printTimes.set(0);
        

        if (msg.getTopic().length() > Byte.MAX_VALUE) 
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        

        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) 
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        

        if (this.isOSPageCacheBusy()) 
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        


        long beginTime = this.getSystemClock().now();
        //写入逻辑
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) 
            log.warn("putMessage not in lock eclipse time(ms)=, bodyLength=", eclipseTime, msg.getBody().length);
        
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) 
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        

        return result;
    

CommitLog 的 putMessage 方法

    /**
     * @param msg
     * @return
     */
    public PutMessageResult putMessage(final MessageExtBrokerInner msg) 
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) 
            // Delay Delivery 延迟发送,将topic 换成 SCHEDULE_TOPIC , queueId 换成 延迟时间级别
            // 这是并发消息消费重试关键的一步,下一章会重点探讨消息重试机制与定时消息的实现原理。
            if (msg.getDelayTimeLevel() > 0) 
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) 
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            
        

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config  自旋或是重入锁看存储配置
        try 
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) 
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            
            if (null == mappedFile) 
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            
            //核心逻辑!!可知 mappedFile是最终写入文件中的实施者
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) 
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) 
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
         finally 
            putMessageLock.unlock();
        

        if (eclipseTimeInLock > 500) 
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)=, bodyLength= AppendMessageResult=", eclipseTimeInLock, msg.getBody().length, result);
        

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    

总结

我们可以看到 RocketMQ 最主要就是利用映射文件的方法来提高读写效率。

参考资料

  • https://blog.csdn.net/qq496013218/article/details/69397380
  • 《Rocket源码分析》

rocketmq消息队列——消息存储详解(代码片段)

2016年双11前后阿里巴巴将RocketMQ捐赠给Apache基金会,很快就吸引了全球众多开源爱好者加入其社区生态中,并在2017年9月成为Apache基金会的顶级项目。利用RocketMQ可以轻松实现应用解耦、流量消峰、消息分发等功能,并... 查看详情

消息队列rocketmq如何存储消息

Rocket的消息是有consumequeue和commitlog组成。ConsumeQueueConsumequeue是消息的逻辑队列,相当于字典目录,用来指定消息在物理文件(commitlog)上的位置,我们可以在配置中指定consumequeue和commitlog存储的目录。每一个topic下的每个queue都... 查看详情

rocketmq:消息存储机制详解与源码解析(代码片段)

文章目录消息存储机制1.前言2.核心存储类:DefaultMessageStore3.消息存储流程4.消息存储文件5.存储文件内存映射5.1.MapperFileQueue5.2.MappedFile5.2.1.commit5.2.2.flush5.3.TransientStorePool6.刷盘机制6.1.同步刷盘6.2.异步刷盘消息存储机制1.前言... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq源码合集

消息队列中间件RocketMQ源码分析——Message发送与接收消息队列中间件RocketMQ源码分析——Message存储分布式消息队列RocketMQ源码分析——Message拉取与消费(上)分布式消息队列RocketMQ源码分析——Message拉取与消费(下&#x... 查看详情

rocketmq(代码片段)

Rocketmq安装文章目录Rocketmq安装一、Rocketmq简介1消息队列优点:2rocketmq组成部分(1producer生产消息(2Consumer消费producer生产的消息(3BrokerServer接收存储分发消息(4NameServer提供路由元数据3rocketmq架构图4rocketmq的... 查看详情

rocketmq:死信队列和消息幂等

参考技术A上一篇《RocketMQ:消息重试》中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试。重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被消息。这通常是由于... 查看详情

源码分析rocketmq系列索引

1、RocketMQ源码分析之NameServer2、RocketMQ源码分析之Broker概述与同步消息发送原理与高可用设计及思考3、源码分析RocketMQ之CommitLog消息存储机制4、源码分析RocketMQ之消息消费5、源码分析RocketMQ消息消费机制----消费者拉取消息机制6、... 查看详情

rocketmq-如何实现顺序消息

...新增消息、修改消息。如何发送和消费顺序消息我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。1.顺序发消息上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在ap... 查看详情

rocketmq三种消息发送方式(代码片段)

文章目录消息发送过程1.Producer从NameServer中获取主题路由信息2.构建消息,发送消息问题3.监听队列,消费消息三种消息发送方式同步消息异步消息单向消息消息发送过程通过快速入门对消息的发送和接收有一个粗略的认... 查看详情

rocketmq消息队列的最佳实践(代码片段)

1生产者1.1发送消息注意事项1Tags的使用一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可由应用自行设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq事务消息

RocketMQ事务消息在实现上充分利用了RocketMQ本身机制,在实现零依赖的基础上,同样实现了高性能、可扩展、全异步等一系列特性。在具体实现上,RocketMQ通过使用HalfTopic以及OperationTopic两个内部队列来存储事务消息推进状态,如... 查看详情

从浅到深认识rocketmq(代码片段)

目录一、引言二、介绍三、Rocketmq关键概念1、主题与标签2、发送与订阅群组3、Broker与NameServer4、广播消费与集群消费5、消息队列6、集群方式7、顺序消息8、数据存储结构四、所有消息中间件涉及的业务问题(随便看看)1、Publis... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

rocketmq(二十二)高级功能汇总(代码片段)

1、消息存储分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。消息生成者发送消息MQ收到消息,将消息进行持久化,在存储中新增一条记录返回ACK给生产者MQpush消息给对应的消费者,然后等待消费... 查看详情

rocketmq工作原理

1.消息的生产过程消息生产过程,经历如下过程:1.向NameServer发出获取消息Topic的路由信息的请求2.nameServer返回该Topic的路由表以及Broker列表3.Producer根据代码中指定的Queue选择策略,从Queue中选择一个队列,用于存储消息4.Producer对... 查看详情