跳至主要內容

RocketMQ 源码分析-消息存储和持久化

lament-z大约 27 分钟

RocketMQ 的消息存储和持久化

本文会先从概念上介绍 RocketMQ 消息存储的设计,为了达成设计目标实现了哪些机制,顺便补充一些背景知识方便读者理解机制的实现。之后会结合源码来详解具体机制的实现。

消息存储模块的设计概要与分析

先笼统的解释 MQ 产品的方案,然后具体到 RocketMQ

常见持久化/存储方案简介

任意应用的存储方案常见的基本上就三种:要么你直接使用文件系统进行数据存储,这个在桌面应用里很常见;要么使用各类 KV 存储,比如使用 redis 作为数据库并开启持久化;要么用关系型数据库,比如 MySQL。

文件系统操作效率最高,但可靠性最低,经典场景就是“突然停电/死机/重启,我忘记保存了!!!”; KV存储方式基本介于中间,这也是为什么大家都喜欢在数据库前加一层 Redis;数据库可靠性最高,但是性能相对前两者最差,这也是为啥所有性能压力的场景我们都要想方设法不让请求直接打在数据库上,同时还要让数据库尽可能的具备更大的抗压能力。

RocketMQ 的持久化设计

RocketMQ 作为一款分布式、高性能、高可用的消息队列产品,为了提供持久化能力,并且还要满足使用中可以通过多种方式查找消息的要求,它采用的是自研文件系统的方案。

提示

基于文件编程:
日常我们接触最多的是基于内存编程,就比如数组、链表、HashMap 这些数据结构的读写都是直接对内存操作。而文件编程则是对磁盘上的数据(文件)进行操作,这就跟内存编程产生了极大的不同。

首先内存编程直接操作内存非常简单,而文件编程需要访问硬盘进行读写,这就来到了 I/O 的领域;然后是访问速读,内存的数据传输速读天然比硬盘要快,而且是数量级的快;硬盘的物理设计本身也会对数据读写有很大的影响,无论是普通硬盘还是SSD,进行文件编程时往往会对此进行特殊的优化,比如 MySQL InnoDB 引擎的默认索引的数据结构。

RocketMQ 的自研文件系统,后面就叫他存储模块(rocketmq/store),主要包括三种文件: CommitLog、ConsumeQueue、Index File。

其中 CommitLog 文件采用顺序写的方式用于存储所有消息数据,它是最基础也是最核心的数据文件。ConsumeQueue 是为了提升消费效率的设计,当消息存入 CommitLog 后,会被异步转发至 ConsumeQueue 中,用于供给给消费端进行消费。Index 则是为快速检索/查找消息服务,它里面存储的是 msg key 与 offset 的关系。

至此,我们可以对存储模块有个大致的认识,当 Broker 收到消息后,它会存入 CommitLog 进行持久化,同时异步转发给 ConsumeQueue 供客户端进行消费,当需要检索消息时,则通过 Index 定位消息在文件中的具体位置。

接下来详细介绍一下这三种文件的具体信息。

存储模块的目录结构

# 仅供参考
.
├── checkpoint
├── commitlog
│   └── 00000000000000000000
├── config
│   ├── consumerFilter.json
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json
│   ├── topics.json
│   └── topics.json.bak
├── consumequeue
│   └── TopicTest
│       ├── 0
│       │   └── 00000000000000000000
│       ├── 1
│       │   └── 00000000000000000000
│       ├── 2
│       │   └── 00000000000000000000
│       └── 3
│           └── 00000000000000000000
├── index
└── lock

CommitLog

对于文件编程,或者通俗的说就是往硬盘里写数据,最快的方式就是顺序写,这跟硬盘的物理设计有关(虽然 SSD 跟普通硬盘磁盘式的写入方式不同,导致顺序写比随机写快的原因不同,但结论一致)。

为避免翻译带来的歧义

顺序读写 Sequential Access
随机读写 Random Access
Random Access: the process of transferring information to or from memory in which every memory location can be accessed directly rather than being accessed in a fixed sequence. 随机读写/存取,这个是翻译上存在已久的争议,不必深究,简单的说就是可以对任意位置直接进行读写。

不同硬盘 IO 的区别

普通硬盘读取数据是从磁道中读取,先定位磁道,然后在磁道中读写,因此对于普通硬盘而言连续 IO (Sequential IO) 效率高于随机 IO (Random IO),因为磁盘寻道更少。

而 SSD 读取数据的方式有点像内存,它最小可读单元是 Page,Page 大小由硬盘本身决定,家用版有 2k、4k、8k、16k 等等,服务器版会有一些超大页的,不过通常是 4k(ps:这就是“4k对齐”的底层逻辑)。

扯远了,SSD 的优势除了天然比传统机械硬盘更快之外,实际它是 random IO 效率很高,跟传统硬盘比,它不需要寻道,跟自身 Sequential IO 比也要更快,当然综合来看其实差别不大。SSD IO 的另一个特点是读的速读远大于写。只看数据写入的话,由于 “erase-before-write” 机制的存在,连续写好于随机写(比如连续写入 1m > 随机写入 256 个 4k)。

因此 RocketMQ 采用将所有消息按抵达 broker 的次序顺序写入 CommitLog,每个文件大小固定,写满一个就去写新的。

// CommitLog 源码位置
org.apache.rocketmq.store.CommitLog

CommitLog 这个类你简单理解它内部包含一个 MappedFileQueue,而 MappedFileQueueMappedFile 组成。

MappedFileQueue 可以暂且看作是一个 MappedFile 组成的数组,它的真实类型是: CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();。对应到存储模块的目录结构中,你可以暂且认为它就是那个 commitlog/ 目录,具体细节在后面会详细展开。

MappedFile 目前可以简单理解为就是 commitLog/ 目录下的 commitLog 文件,它内部数据就是系统写入的消息,具体细节在后面会详细展开。

然后我们来介绍下 CommitLog 文件的设计,首先单个文件大小固定,默认为 1GB,文件名长度固定为 20,文件名格式为 偏移量+高位补0 | 000... + offset ,看起来就是00000000000000000321这种样子。

文件大小的配置:Broker 配置文件中的 mapedFileSizeCommitLog 选项。

CommitLog 文件内部的结构大致为:已写入的全部消息:大小不定 + maxBlank(空闲空间大小):4字节 + BLANK_MAGIC_CODE:4字节

存储目录: 默认值:${ROCKET_HOME}/store/commitlog,这里的 ${ROCKET_HOME} 通常就是你用户目录,你可以理解为~。可以通过修改 broker 配置文件中的 storePathRootDir 来自定义存储目录。

  • 简单总结

    先回顾一下 commitlog 文件的命名方式,看似平平无奇的设计里隐藏着一个优秀的特性。

    这个设计巧妙的地方在于,只要给出任意一个消息的偏移量 (offset),就可以利用二分查找迅速定位这个消息存储在哪个 CommitLog 中,定位到文件后只需要msg.offset - commitLog.name 就能得到消息在 commitLog 中的绝对位置。

    以上这个查找流程相当于轻松实现了一套基于文件系统的高效随机访问 (Random Access) 的功能。文件系统的随机访问,尤其是传统机械硬盘上的随机访问是一个非常低效的过程,同理可参考之前提到的 MySQL 索引的数据结构选择,也是为了实现相同的目标。

    之前说了写入方式就是顺序写入,一个文件写满之后写入下一个文件,实际上你想想看就知道,消息本身长度是不可控的,不可能把 1GB 正好写满,如果一个消息来写 CommitLog 时发现自己的长度 + 8 超过当前 commitLog 文件的剩余空间时,就会直接创建一个新的 commitLog 并写入消息。

    // CommitLog::doAppend
     if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank)
    

    前面说的 + 8,也就是代码中的END_FILE_MIN_BLANK_LENGTH,实际上就是说每个 CommitLog 文件至少会有 8 个字节的空闲,高 4 位用于记录 CommitLog 文件剩余的空闲空间,低 4 位存放魔数。

    最后,通过详细介绍(其实还是有很多细节没讲,会在后面分析具体机制时讲解)CommitLog 文件,我们不难发现该文件的设计核心就是在强调写入效率的最大化,同时兼顾了高效的根据 offset 进行随机读的能力(因为消息只写不删,而写入全是顺序写,删除是直接删 CommitLog 文件)。至此作为消息队列,消息写入的能力有了,而消费并没有实现,毕竟 RocketMQ 的卖点是基于消息主题 Topic 进行消息传输,很明显虽然 CommitLog 写入很高效,但它完全没有提供根据 topic 去找到消息的能力,总不可能去遍历 commitLog 去找 topic,何况相同 topic 下的消息很可能分布在不同的 commitLog 文件中,所以接下来我们就来看看专门为消息消费服务的 ConsumeQueue。

ConsumeQueue

ConsumeQueue 设计目的也非常明确,那就是为消费端提供高效的根据 topic 查找消息的能力。

而同一个 topic 中的消息在 CommitLog 文件中的分布基本上可以认为是不连续和随机的,想要根据 topic 快速查找消息,ConsumeQueue 的做法相当于是给 commitlog 加了 Topic 索引。

从前文的目录结构中就可以清楚的看到,ConsumeQueue/ 下的一级目录名就是主题 topic 名称,二级目录名就是该主题包含的队列ID,二级目录下的文件就是待消费的消息队列,这个待消费的消息队列就是 ConsumeQueue 文件。

前面已经提到,ConsumeQueue 中的数据来自 CommitLog 写入完成后的异步转发。这里异步转发的数据实际上就是 ConsumeQueue 文件中最小的数据单元。

// 源码位置
org.apache.rocketmq.store.ConsumeQueue

ConsumeQueue 这个类内部依然包含一个 MappedFileQueue,而 MappedFileQueue 依然 MappedFile 组成。

MappedFileQueue 在这里相当于是 ConsumeQueue/queueid/ 目录,而 MappedFile 相当于是 ConsumeQueue 文件,它里面存储的就是异步转发来的数据。

然后来看 ConsumeQueue 文件的设计,ConsumeQueue 整体都采用了固定长度的设计,其最小数据单元,随便起个名字,cqNode,定长 20 byte,内部结构为:

8 byte4 byte8 byte
commitlog-offsetmsg.sizehash(msg.tag)
该消息在 commitlog 中的偏移量消息的总长度消息的 tag 的 hash 值

单个 ConsumeQueue 文件默认包含 30w 个 cqNode,因此 ConsumeQueue 文件的固定大小为:300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE(CQ_STORE_UNIT_SIZE 就是 cqNode 的定长 20 byte)。

ConsumeQueue 同样是顺序写,文件命名的方式与 commitlog 文件一致,也是 offset + 高位补0,唯一的区别是这里的 offset 永远是 20 的整数倍。

存储目录: $HOME/store/consumequeue/{topic}/{queueID}/{ConsumeQueueFile}

  • 总结分析
    了解 ConsumeQueue 的设计之后我们来看它为消费消息带来了什么变化。

    消费端可以根据 topic 和 queueid 轻松定位到具体的 ConsumeQueue 文件,接下来要找到需要的 cqNode。

    消费端在当前主题的当前队列下的消费进度实际上就是所需 cqNode 在 ConsumeQueue 中的偏移量, 比如你消费到第3条消息,那么 3 * 20 就是第四条消息的 cqNode 在 ConsumeQueue 中的偏移量。

    觉得不好理解可以参考下表:

    消费进度012...
    cqNode-head-position0*201*202*20...
    ConsumeQueuecqNode-0cqNode-1cqNode-2....

    其实仔细思考一下你会发现由于全是定长设计,ConsumeQueue 完全可以看作是一个数组,它的每个元素都是一个 cqNode,数组下标和对应 cqNode 在文件中的起始位置就是下标*20的线性关系。

    无论消费进度在这里代表最新已消费进度还是下一个未消费进度,无非就是计算消息起始位置时候 +20/-20 的区别,常数计算反正不会影响线性关系。

    准确定位到 cqNode 后,通过 commitlog-offset 可以从 commitlog 中准确定位到消息所处具体的文件的具体位置起始, msg.size 可以准确读出消息的全部数据。而 hash(msg.tag) 是用来支持消息过滤功能的,也就是去 commitLog 中查找之前多加一个根据 tag 过滤的环节,需要注意的是,为了满足定长设计,这里存储的是 tag 的 hash 值,注意哈希冲突。

Index File

有了 CommitLog 和 ConsumeQueue,其实 RocketMQ 的基础功能基本齐活儿了。

Index 文件的引入则可以看作是产品功能增强,它为 rocketmq 提供了按照消息属性进行检索的能力。 这里说的消息属性就是 Message 类里的 private Map<String/*name*/, String/*value*/> properties;。 后面提到的 Key 实际上是 Key = Topic + "#" + msg.keymsg.key 就是 Message.properties.name,但是要注意 Message.getKeys() 方法返回的是所有 name + 空格拼接的字符串。

源码位置:org.apache.rocketmq.store.index.IndexFile

Index 文件也是定长设计,主要由三个部分组成:文件头、哈希槽(Hash Slot)、索引数据(indexData)。其中文件头固定 40 byte,Hash Slot 500万个,indexData 槽 2000万个。数据也是来自 commitlog,消息写完 commitlog 后会转发 key 和 commitlog-offset(phyoffset)过来。

  • 大体结构:

    文件头 indexHeaderHash SlotindexData Slot
    40 byte4 byte * 500w20 byte * 2000w
  • 文件头 indexHeader:

    beginTimestampendTimestampbeginPhyoffsetendPhyoffsethashSlotCountindex count
    8 byte8 byte8 byte8 byte4 byte4 byte
    首个消息 put 的存储时间最新消息的存储时间首个的消息在 commitlog 中的偏移量最新消息的偏移量已使用的 hashSlot消息已使用的 indexData 的总数

    文件头实际上当成临时变量存储区域就好了。begin 开头的就是 index 文件中收到的第一个消息的信息,end 开头的就是最新消息 put 时的信息,hashSlotCount 是有消息的 key 落到任意槽时自增 +1,index count 是每次有消息进来就自增+1。

  • hashSlot:

    4 byte,里面的数据是 indexcount,实际上 40 + 4*500w + indexcount * 20 = index-offset,你把它当成 hashMap 里链表的尾指针就好。

  • indexData:

    hashcodephyoffsettimedifpreIndexNo
    消息的 key 的 hashcode消息在 commitlog 中的 offsettimedif = 该消息存入 index 文件时的timestamp - indexHeader.beginTimestamp (秒)0 or indexcount,其实就是冲突时前一个结点的 offset,计算方式跟 hashSlot 里面一样,当成前指针即可
  • 总结

    根据以上说明你可以发现每个 index file 实际上就是一个 hashMap。与 ConsumeQueue 类似,它的数据也来自 commitlog,不同于前两个文件的顺序写,IndexFile 中只有 indexData 部分是顺序写。

    每当消息的 key 和 commitlog-offset 转发给 IndexFile 后,它会先拼接生成 Key,然后对 Key 进行哈希,哈希方法就是简单的 String::hash 取绝对值,然后对 hashSlotNum(500w) 取模得到 hashSlot 的逻辑位置 slotPos,int slotPos = keyHash % this.hashSlotNum;,之后定长计算得到 slot 在 IndexFile 中的 offset。indexData 的 index-offset 计算方式也差不多,因为这部分是顺序写,所以直接 40 + 4*500w + indexHeader.indexcount * 20

    所谓根据属性查找消息,其实就是根据 key 得到 hashcode,根据 hashcode 找到 slot-offset,从中取出 indexcount,计算出 indexData 的 index-offset,取出 commitlog-offset,有了 commitlog-offset 就可以去 commitlog 里取消息了。

    所以你看,不管是 consumequeue 还是 IndexFile 都是 先找到消息的 commitlog-offset,然后回到 commitlog 中查找。

CheckPoint

源码:StoreCheckpoint

这个文件就是用来记录刷盘时间点的。

physicMsgTimestamplogicsMsgTimestampindexMsgTimestamp
CommitLog 刷盘时间点ConsumeQueue 刷盘时间点IndexFile 刷盘时间点

因为是记录时间戳,所以都是占 8 byte。

核心机制

MMAP

简单介绍一下 MMAP,前文中反复出现的 MappedFile 就是这种技术的应用。

它是一种通过系统调用将文件映射到内存的技术,零拷贝方案之一。

简单的说就是可以直接从内核空间映射到用户空间,并且数据的修改和数据写入硬件(刷盘)是分开的。 你可以使用修改内存的方式修改文件数据,并且可以自行决定在什么时间点将修改刷盘。

对 MMAP 感兴趣可以先去看操作系统(Linux)、内存、IO 等相关知识。

CommitLog 刷盘

在 Broker 上篇中我们提到了接收消息时会发起异步调用进行刷盘,也就是 SendMessageProcessor::asyncSendMessage 中的 this.brokerController.getMessageStore().asyncPutMessage(msgInner);

现在来具体看一下刷盘流程,源码位置:DefaultMessageStore::asyncPutMessage

@Override
 public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {

     // 首先检查 MsgStore 状态是否正常,正常则返回 PUT_OK。
     // 以下四种状态均为异常
     // 1. MsgStore 是否为 shutdown 状态。
     // 2. Broker 是否为 Slave
     // 3. MsgStore 是否为可写状态
     // 4. 是否缓存页繁忙
     PutMessageStatus checkStoreStatus = this.checkStoreStatus();
     // 不正常直接返回
     if (checkStoreStatus != PutMessageStatus.PUT_OK) {
         return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
     }
     // 检查消息状态,检查 topic 和属性长度 不能过长
     PutMessageStatus msgCheckStatus = this.checkMessage(msg);
     if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
         return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
     }
     long beginTime = this.getSystemClock().now();
     // 异步调用 coomitlog 存储消息
     CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

     putResultFuture.thenAccept((result) -> {
         long elapsedTime = this.getSystemClock().now() - beginTime;
         if (elapsedTime > 500) {
             log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
         }
         this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

         if (null == result || !result.isOk()) {
             this.storeStatsService.getPutMessageFailedTimes().add(1);
         }
     });

     return putResultFuture;
 }

然后来到 this.commitLog.asyncPutMessage(msg);,该方法有两种实现,一种是分布式 CommitLog ,另一种就是普通的 commitLog,这里我们看普通的即可,5.0 之前不建议使用分布式。

// CommitLog::asyncPutMessage

// 前面 msg 组装啥的省略

// 先上锁 默认是 ReentrantLock
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
  // 拿到最新的 mappedFile,mmap,其实就是最新的 commitlog 文件
  MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
  // 继续省略  

  // 注意同名方法,前面那个 getLastMappedFile() 是会返回 null的。
  // mappedFile 为空 or 写满,则创建新文件
  if (null == mappedFile || mappedFile.isFull()) {
      // 实际调用的是 tryCreateMappedFile(0) -> doCreateMappedFile -> ...
      // 调用链太长了,正常情况下,最后负责创建 mappedFile 的一定是 allocateMappedFileService::mmapOperation
      mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
  }

  // mappedFile.appendMessage 就是 前文说的 消息会顺序写入 CommitLog
  // 调用链结尾是 CommitLog.doAppend
  result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

  switch (result.getStatus()) {
      //...
      /* 这里看这好几个分支,实际上就三种状态:
      PUT_OK 代表写入成功,
      END_OF_FILE 代表文件写满了,创建一个新文件重新 appendMessage,
      其他就是消息格式异常和未知错误,理论上客户端如果正确校验过消息,消息格式异常也不会出现在这里(防御性编程)。
      所以真的看到消息错误的异常时要小心,特别要注意该消息的来源。
      */
  }
  // ...

   // submitFlushRequest 就是刷盘方法,里面有同步刷盘 SYNC_FLUSH,异步刷盘 ASYNC_FLUSH 两种逻辑
   // 无论是哪种刷盘,都是创建请求,然后提交给 xxService(就是不同类型的刷盘线程),Service/线程去完成刷盘操作

   // 就比如 flushCommitLogService 可以是 GroupCommitService 类型(同步) ,也可以是 FlushRealTimeService(异步)。
   // 最终执行的方法是 mappedByteBuffer.force(); 或者 fileChannel.force();
   // 执行成功,则 pageCache 内的数据就被写入硬盘
   // 这里还有个概念是 GroupCommit,后面再详细说明
   CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);

   // 这个提交重复请求,就是 broker 高可用,master 写完给 slave 转发。
   CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

   // ...
}
  • CommitLog::submitFlushRequest
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
              // 构造 request
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                // 提交 request  多眼熟几遍
                service.putRequest(request);
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                // 默认情况,真实类型 FlushRealTimeService
                flushCommitLogService.wakeup();
            } else  {
                // 开启 TransientStorePool 机制时
                // 真实类型 CommitRealTimeService
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
  • 自动创建 MappedFile 的线程: allocateMappedFileService

    allocateMappedFileService::mmapOperation这里稍微补充说明以下,allocateMappedFileService 这个线程专门负责自动创建 MappedFile 文件,它里面维护一个 requestTable,不管是 CommitLog 还是 ConsumeQueue ,只要需要创建文件的时候就会生成一个创建请求 AllocateRequest,然后放入 requestTable, 配合 CountDownLatch ,allocateMappedFileService 就会自动从这个表里取出信息然后创建文件。成功创建好文件之后会对文件预热,MappedFile::warmMappedFile,预热行为就是给文件里写满0。全部完成后创建好的 mappedFile 会 set 回 AllocateRequest 里,这样请求方就可以直接从 requestTable 里读取了。

  • 组提交 GroupCommit :

    GroupCommitService里面有一个“组提交(GroupCommit)”的概念,里面应用了一个读写分离的小技巧。

    GroupCommitService 里维护了两个 List,requestsRead 和 requestsWrite,刷盘请求(GroupCommitRequest)会进入 requestsWrite,而刷盘线程从 requestsRead 里读取刷盘请求。 每当 requestsRead 里的请求全部执行完后,就会把 requestsRead 重置为一个空 List,然后去和 requestsWrite 交换数据。 整个过程中只有交换数据这一步是阻塞的。(实际代码中的交换流程好像是直接 Read = Write,然后 write = new List(),不重要,反正一个意思)

  • flushCommitLogService

    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
              this.flushCommitLogService = new GroupCommitService();
          } else {
              this.flushCommitLogService = new FlushRealTimeService();
          }
    

    GroupCommitService 不会在没有刷盘任务的时候一直空转,他会停个 10ms,如果还是没有刷盘任务,则继续停。

    FlushRealTimeService 的线程间隔默认是 200ms。

    他俩的源码就不展开看了,感兴趣可以自行浏览,他们仨(包括 CommitRealTimeService)都是线程,直接看 run 方法即可。

  • commitLogService

    CommitLog 类里还有一个线程叫做 commitLogService,这个线程只有在开启 TransientStorePool 机制时才会启动,他是用来代替 flushCommitLogService的。

    // CommitLog::start
    public void start() {
          this.flushCommitLogService.start();
    
          if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
              this.commitLogService.start();
          }
      }
    

    它的真实类型是 CommitRealTimeService,它刷盘的方式跟前面都不一样,有兴趣可以先看一下后面的 TransientStorePool 机制的介绍,然后再去看他的代码,这里就只注明个别参数的含义,不展开看了。

  • MessageStoreConfig.commitIntervalCommitLog

    CommitRealTimeService 线程的间隔,默认 200ms,跟异步刷盘那个线程一致,这个 TransientStorePool 机制通常也是用来替换异步刷盘的,毕竟如果你都开启实时刷盘的模式了,说明数据安全高于性能,而 TransientStorePool 机制下一旦发生意外,丢的可是一大块内存的消息。

  • MessageStoreConfig.commitCommitLogLeastPages

    一次至少提交几个页(Page)的数据,默认为 4。页数不满足的时候会直接跳过,等下次提交。

  • MessageStoreConfig.commitCommitLogThoroughInterval

    两次有效提交的最大间隔,默认 200ms。 注意跟线程间隔区分,因为有前一个参数在,所以 CommitRealTimeService 是有可能拿着一部分的数据一直不提交的,该参数你可以认为是用来强制要求线程提交数据的。

    MessageStoreConfig 里面有一部分属性是有英文注释,可以参考。

ConsumeQueue 刷盘

开始阅读前可以先对比以下 CommitLog 和 ConsumeQueue 的属性字段(field),看起来 ConsumeQueue 似乎没有自己的 Service,实际上是有的,只不过是在 DefaultMessageStore 中。

前文一直说消息写完 commitLog 后会转发给 ConsumeQueue 和 IndexFile,现在我们来看这个转发。

源码位置:org.apache.rocketmq.store.DefaultMessageStore

DefaultMessageStore 应该也算是 RocketMQ-store 的核心类了,它这里管理着很多线程(Service)。 前面提到的线程基本都是它来创建和启动。在它的构造方法里,你能看到如下代码:

// from DefaultMessageStore field
LinkedList<CommitLogDispatcher> dispatcherList;

// from DefaultMessageStore Constructor
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

这里你就把 dispatcherList 看成是任务列表,后面的对象就是任务,这个任务必须实现 void dispatch(final DispatchRequest request);方法。

然后 DefaultMessageStore 中有个 ReputMessageServiceReputMessageService 这个线程就不停的执行 doReput(),实际上有 1ms 间隔,然后这个 doReput() 会根据 dispatchRequest 状态去调用 DefaultMessageStore.this.doDispatch(dispatchRequest);,这个 doDispatch(dispatchRequest) 方法就是遍历前面的任务列表,分别执行任务的 dispatch(DispatchRequest request) 方法,也就是前面的 CommitLogDispatcherBuildConsumeQueue()CommitLogDispatcherBuildIndex()

然后我们来看 CommitLogDispatcherBuildConsumeQueue::dispatch:

  • CommitLogDispatcherBuildConsumeQueue::dispatch

    // DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue::dispatch
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // 状态 消息已提交
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
    
  • DefaultMessageStore::putMessagePositionInfo

    //DefaultMessageStore::putMessagePositionInfo
    // 前文介绍 ConsumeQueue 时讲过的,根据 Topic 和 QueueID 定位文件
    // 其实就是1级目录/2级目录/cq文件, findConsumeQueue 这个方法自己去看吧
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    // 写入文件的包装方法
    cq.putMessagePositionInfoWrapper(dispatchRequest);
    
  • ConsumeQueue::putMessagePositionInfoWrapper

    这个其实也没啥可看的。

    public void putMessagePositionInfoWrapper(DispatchRequest request) {
      final int maxRetries = 30;
      // cq 文件可写标记
      boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
      for (int i = 0; i < maxRetries && canWrite; i++) {
          long tagsCode = request.getTagsCode();
          // cqExt 扩展文件处理逻辑 直接跳过
          if (isExtWriteEnable()) {
              //...
          }
          // putMessagePositionInfo 方法去写文件
          boolean result = this.putMessagePositionInfo(
                  request.getCommitLogOffset(), // msg.phyoffset
                  request.getMsgSize(),  // msg.size
                  tagsCode, // msg.tag.hashcode
                  request.getConsumeQueueOffset()); // cur cq logic-offset
    
          // 根据结果更新 checkpoint 文件        
          if (result) {
              // Slave or 分布式 commitLog
              if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                  this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                  this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
              }
              this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
    
              // 注意前面是循环写,只要成功就直接 return
              return;
          } else {
              // XXX: warn and notify me
              // log
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  log.warn("", e);
              }
          }
      }
    
      // XXX: warn and notify me
      log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
      this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }
    
  • ConsumeQueue::putMessagePositionInfo

    这个方法本来也不打算贴的,因为 ConsumeQueue 其实也是顺序写,所以处理逻辑跟 commitLog 一样, 都是 mappedFile::appendMessage。 不打算贴的原因是很容易跟前文逻辑混淆,比如代码中 ConsumeQueue 文件的物理偏移量都叫 xxxLogicOffset,按代码表达意思其实就是 commitLog 文件才叫物理偏移量, ConsumeQueue 就是逻辑文件,毕竟 ConsumeQueue 的数据都是可以从 CommitLog 中推理出来的。

    再比如 PhysicOffset 突然就加上了 msg.size,然后还没有注释解释原因,实际上它突然加个 msg.size 只是为了恢复文件的时候跟 commitlog 文件进行比较,毕竟你要判断 commitlog 中的消息是否损坏,它起始位置是判断不出来的(除非消息整个丢了),只有结尾大小不一样,说明这个消息没写全,是损坏的。

    换句话说,ConsumeQueue.maxPhysicOffset 相当于保存了两部分信息,一是消息的物理偏移量,二是消息的正确大小。

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {

    // offset:PhysicOffset aka 消息在 commitlog 中的 offset
    // maxPhysicOffset: cq 中存储的消息在 commitlog 中的 offset + msg.size
    // 最新消息的 phyOffset + msg.size 必然大于CQ中已有消息的 maxPhysicOffset
    if (offset + size <= this.maxPhysicOffset) {
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
        return true;
    }

    this.byteBufferIndex.flip(); // buffer 指针归 0
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); // 20 byte
    this.byteBufferIndex.putLong(offset); // PhysicOffset
    this.byteBufferIndex.putInt(size); // msg.size
    this.byteBufferIndex.putLong(tagsCode); // msg.tag.hashcode

    // 计算消息在 CQ 中的 物理位置
    // cqOffset 在这里是逻辑位置,至少是本文说的 消息在这个 cq 队列中的逻辑位置
    // 但是吧,算出来的 offset,它代码里叫 LogicOffset,明明相同语境下,commitlog 你叫物理位置
    // 到 cq 文件里就变成逻辑位置了,而且这个 offset 是不加 msg.size 的
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

    // 根据预期的 CQ 中的物理偏移量,二分查找 找出对应的 ConsumeQueue 文件,满了则创建信息
    // 这儿的逻辑跟 commitlog 文件是一样的,CQ 里很多处理逻辑跟 CommitLog 中都是同一套
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {

        // 队列为空,起始可写入位置也为0,但是 cq 逻辑偏移量(cqOffset) 不为 0
        // 说明当前要写入 CQ 的消息不是第一条消息,那么先写空结点占位 fillPreBlank
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
            // 结合这里看,这个 LogicOffset 的意思大概是,CQ 文件中的物理偏移量都叫 LogicOffset
            this.minLogicOffset = expectLogicOffset;
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            // 向前面应该有数据的空的CQ结点中写入 空结点:0L + Integer.MAX + 0L
            // 一直填充到 本消息的起始位置为止。
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                + mappedFile.getWrotePosition());
        }

        if (cqOffset != 0) {
            // 这段不用看,就是 offset 对不上的话,打印偏差值的地方。
        }

        // maxPhysicOffset 是 消息结尾位置,而不是消息写入位置
        this.maxPhysicOffset = offset + size;
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
    return false;
}

ConsumeQueueExt

ConsumeQueue 扩展文件,ConsumeQueue 只有三个字段,Ext 版本就是比 ConsumeQueue 多了几个字段。 由于多了字段,自然要有单独的逻辑,这部分不展开。

indexFile 刷盘

同 ConsumeQueue 。

文件恢复

要恢复的文件自然就是 CommitLog、ConsumeQueue、IndexFile 这三种。由于后两者的数据均来自 CommitLog,因此恢复时基本以 ComitLog 为准。 这里只介绍 CommitLog、ConsumeQueue,IndexFile 的恢复流程省略,这篇幅已经太长了。

文件恢复是 Broker 启动,或者说 DefaultMessageStore 启动时,加载过程中的一步,也就是 DefaultMessageStore::recover(boolean lastExitOK),无论是正常退出还是异常关闭, recover(lastExitOK) 的第一件事是先恢复 ConsumeQueue 文件,找到最新一条格式正确的消息的记录,并取出该消息的 commitLog-offset 的值,存储到 maxPhyOffsetOfConsumeQueue 中作为下一步的入参。

详见: DefaultMessageStore::load

  • 正常退出

    Broker 正常退出时,会先完成刷盘,然后再关闭 Broker,也就是说内存中所有数据都保存在文件中了。

    此时文件恢复会进入 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);,从倒数第三个 commitlog 文件开始恢复,不足 3 个文件则从第一个开始恢复。 恢复过程,先 checkMessageAndReturnSize,然后设置 ConsumeQueue。

  • 异常退出

    异常关闭则进入 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);(注意该方法已标记 Deprecated,代码就不贴了,不建议看,知道大体逻辑就好了。

    这里的逻辑基本上是:利用 CheckPoint 文件中记录的三个时间戳(CommitLog、ConsumeQueue、Index文件最后的刷盘时间戳)中,最小的来进行辅助判定。

    那个方法会从 commitLog 的最后一个文件(int index = mappedFiles.size() - 1;)开始判定,判定逻辑在 isMappedFileMatchedRecover,读出该文件第一条消息的存储时间,如果这个存储时间小于 CheckPoint 文件中的最小刷盘时间(),就可以从这个文件开始恢复,如果大于,则寻找上一个文件(index = index - 1),这里相当于默认了 CheckPoint.getMinTimestampIndex 之前的消息绝对有效。后面流程就跟正常恢复一样了。

    这个 CheckPoint.getMinTimestampIndex 的值如下:

    
      public long getMinTimestampIndex() {
          return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
      }
    
      public long getMinTimestamp() {
          long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
    
          min -= 1000 * 3;
          if (min < 0)
              min = 0;
    
          return min;
      }
    
  • recoverTopicQueueTable

    无论是正常退出还是异常关闭,recover 还有最后一步恢复 CommitLog.topicQueueTable 的数据,其实就是初始化这个表里的数据,它里面的数据来自 DefaultMessageStore.consumeQueueTable,而 consumeQueueTable 中的数据则是在 loadConsumeQueue() 时从 ConsumeQueue 文件中读出来的。

TransientStorePool 机制

默认情况下,RocketMQ 是先写页缓存(pageCache),消费消息也是从 pageCache 查,高并发时就会在 broker 日志中发现瞬时 broker busy 的异常,大部分情况下就是 pageCache 太忙。

这个机制的引入就是为了解决这个问题,它采用的方式你可以理解为空间换时间,也可以理解为内存的读写分离

启用该机制后,Broker 收到的消息会先写入堆外内存并立刻返回,然后用异步的方式提交给 pageCache,之后再异步刷盘。这样相当于写消息操作都在堆外内存,而 pageCache 主要面对消息消费,也就是读操作。

不过缺点是,broker 意外退出时,pageCache 还有可能保留一部分数据,堆外内存的数据就全丢了。

总结

熟悉三种文件的设计思路和文件结构,了解刷盘/恢复的大致流程即可。

本文跳过了消息消费相关的一部分内容,后续在消息消费章节再进行说明。

reference

上次编辑于:
贡献者: Lament