RocketMQ 消息存储文件

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息存储文件部分,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

文件详解

了解完了文件存储部分使用的核心技术,在让我们回到 RocketMQ 文件组织的讨论中来,接下来我将挨个分析各个核心文件存储的内容和使用方法。

Commit文件

commitlog 目录的组织方式在前面已经详细介绍过了,该目录下的文件主要存储消息,其特点是每一条消息长度不相同,CommitLog 文件存储的逻辑视图如下图所示,每条消息的前面4个字节存储该条消息的总长度。整个 CommitLog 文件默认大小为 1G。
commit-log-file
在查找消息时,需要先根据要查找的消息偏移找到消息所在的文件,然后根据消息偏移与文件大小取余,得到消息在文件中的位置,最后根据消息大小,取出指定长度的消息内容。

1
2
3
4
5
6
7
8
9
public SelectMappedBufferResult getMessage(final long offset, final int size) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}

ConsumeQueue文件

RocketMQ 基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在 CommitLog 文件中,试想一下如果消息消费者直接从消息存储文件(CommitLog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ 为了适应消息消费的检索需求,设计了消息消费队列文件(ConsumeQueue),该文件可以看成是 CommitLog 关于消息消费的“索引”文件,消息主题,第二级目录为主题的消息队列。
store-files
为了加快检索速度,并且减少空间使用,ConsumeQueue 不会存储所有消息正文,只会存储如下内容:
consume-queue-file
单个 ConsumeQueue 文件中默认包含 30 万个条目,单个文件的长度为 30w × 20 ≈ 6M 字节, 单个 ConsumeQueue 文件可以看出是一个 ConsumeQueue 条目的数组,其下标为 ConsumeQueue 的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。 ConsumeQueue 即为 CommitLog 文件的索引文件, 其构建机制是当消息到达 CommitLog 文件后, 由专门的线程产生消息转发任务,从而构建消息消费队列文件与下文提到的索引文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
/*startIndex 消息索引*/
int mappedFileSize = this.mappedFileSize;
// 根据消息索引 * 20 得到在 ConsumeQueue 中的物理偏移
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
// 找到物理索引所在的文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
// 物理索引与文件大小取余,得到数据存储的位置,然后通过MappedByteBuffer的到内存映射Buffer
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}

根据 startIndex 获取准备消费的条目。首先 startIndex * 20 得到在 ConsumeQueue 中的物理偏移量,如果该 offset 小于 minLogicOffset,则返回 null,说明该消息已被删除;如果大于 minLogicOffset,则根据偏移量定位到具体的物理文件,然后通过 offset 与物理文大小取模获取在该文件的偏移量,最终的到从 startIndex 开始,到该 ConsumeQueue 有效结尾的所有数据对应的 MappedByteBuffer。

除了根据消息偏移量查找消息的功能外,RocketMQ 还提供了根据时间戳查找消息的功能,具体实现逻辑如下:

  1. 首先根据时间戳定位到 ConsumeQueue 物理文件,就是从第一个文件开始找到第一个文件更新时间大于该时间戳的文件。
  2. 然后对 ConsumeQueue 中的所有项,使用二分查找,查询每条记录对应的 CommitLog 的最后更新时间和要查询的时间戳
  3. 最终找到与时间戳对应的 ConsumeQueue 偏移,或者离时间戳最近的消息的 ConsumeQueue 偏移

Index索引文件

消息消费队列是 RocketMQ 专门为消息订阅构建的索引文件,提高根据主题与消息队列检索消息的速度,另外 RocketMQ 引入了 Hash 索引机制为消息建立索引,HashMap 的设计包含两个基本点: Hash槽 与 Hash 冲突的链表结构。
index-file
从图中可以看出,indexFile 总共包含 IndexHeader、 Hash 槽、 Hash 条目(数据)。

IndexHeader
IndexHeader头部,包含 40 个字节,记录该 IndexFile 的统计信息,其结构如下。

  • beginTimestamp: 该索引文件中包含消息的最小存储时间。
  • endTimestamp: 该索引文件中包含消息的最大存储时间。
  • beginPhyOffset: 该索引文件中包含消息的最小物理偏移量(CommitLog 文件偏移量)。
  • endPhyOffset:该索引文件中包含消息的最大物理偏移量(CommitLog 文件偏移量)。
  • hashSlotCount: hashSlot个数,并不是 hash 槽使用的个数,在这里意义不大。
  • indexCount: Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。

Hash槽
Hash槽,一个 IndexFile 默认包含500万个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引。

Hash 条目
Index条目列表,默认一个索引文件包含 2000 万个条目,每一个 Index 条目结构如下。

  • hashcode: key 的 hashcode。
  • phyOffset: 消息对应的物理偏移量。
  • timeDif:该消息存储时间与第一条消息的时间戳的差值,小于 0 该消息无效。
  • preIndexNo:该条目的前一条记录的 Index 索引,当出现 hash 冲突时,构建的链表结构。

Index文件的写入步骤如下:

  1. 如果当前已使用条目大于等于允许最大条目数时,则返回 false,表示当前索引文件已写满。如果当前索引文件未写满则根据 key 算出 key 的 hashcode,然后 keyHash 对 hash 槽数量取余定位到 hashcode 对应的 hash 槽下标, hashcode对应的hash槽的物理地址 = IndexHeader 头部(40字节) + 下标 * 每个 hash 槽的大小(4字节)。
  2. 读取 hash 槽中存储的数据,如果 hash 槽存储的数据小于 0 或大于当前索引文件中存储的最大条目,则将该槽的值设置为 0。
  3. 将条目信息存储在 IndexFile 中。
    1. 计算新添加条目的起始物理偏移量,等于头部字节长度 + hash 槽数量单个 hash 槽大小(4个字节) + 当前 Index 条目个数单个 Index 条目大小(20个字节)。
    2. 依次将 hashcode、消息物理偏移量、时间差timeDif、原来 Hash 槽的值存入该索引条目中。
    3. 将新添加的索引条目索引存入 hash 槽中,覆盖原来的值。
  4. 更新文件索引头信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 判断是否写满了
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 计算 hash 槽的位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
// 获取原来槽内的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// <= 0 或者大于当前存储数,则认为其无效
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 计算时间差值
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 计算索引条目的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 写入索引条目
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 更新槽的值
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 更新头部数据
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}

至此,索引文件的写入套路就已经介绍完了,它通过 hash 槽存储了 hash 冲突链表的头指针,然后每个索引项都保存了前一个索引项的指针,借此,在文件存储中实现了链表的数据结构。

当根据 key 查找消息时,不光可以设置要查找 key 还可以设置最大查找数量,开始时间戳,结束时间戳,操作步骤如下:

  1. 根据 key 计算 hashcode,然后 keyHash 对 hash 槽数量取余定位到 hashcode 对应的 hash 槽下标。
  2. 如果对应的 Hash 槽中存储的数据小于 1 或大于当前索引条目个数则表示该 HashCode 没有对应的条目,直接返回。
  3. 由于会存在 hash 冲突,根据 slotValue 定位该 hash 槽最新的一个 Item 条目,将存储的物理偏移加入到 phyOffsets 中 ,然后继续验证Item条目中存储的上一个 Index 下标,如果大于等于 1 并且小于最大条目数,则继续查找,否则结束查找。
  4. 根据 Index 下标定位到条目的起始物理偏移量,然后依次读取 hashcode、 物理偏移量、时间差、上一个条目的Index下标,循环步骤4。
    • 如果存储的时间差小于 0,则直接结束;如果 hashcode 匹配并且消息存储时间介于待查找时间start、 end之间则将消息物理偏移量加入到phyOffsets
    • 验证条目的前一个 Index 索引,如果索引大于等于 1 并且小于Index条目数,则继续查找,否则结束整个查找。

具体的实现代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end) {
if (this.mappedFile.hold()) {
// 计算 hash 槽的位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
try {
// 获取 hash 槽内存的索引位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// 验证合法性
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
// 遍历索引链表
for (int nextIndexToRead = slotValue; ; ) {
// 数量够了就退出
if (phyOffsets.size() >= maxNum) {
break;
}
// 找到本条索引的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
// 读取索引条目的内容
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
// 验证时间合法性
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// 验证整体合法性
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
// 验证链表中下一个节点的合法性,如何合法则继续循环,否则退出
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
}
}
}

CheckPoint文件

checkpoint 的作用是记录 CommitLog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为 4k,其中只用该文件的前面 24 个字节,其存储格式如下图所示。
check-point-file

  • physicMsgTimestamp: CommitLog文件刷盘时间点。
  • logicsMsgTimestamp: 消息消费队列文件刷盘时间点。
  • indexMsgTimestamp: 索引文件刷盘时间点。

更新ConsumeQueue和IndexFile

消息消费队列文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumeQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumeQueue、IndexFile文件。

文件恢复

由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumeQueue、Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个原因宕机,导致 CommitLog、ConsumeQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 文件中存在,但由于并没有转发到 ConsumeQueue,这部分消息将永远不会被消费者消费。

接下来我们看一看 RocketMQ Broker 的启动过程:

  1. 判断上一次退出是否正常。
    • Broker在启动时创建abort文件,在退出时通过注册 JVM 钩子函数删除 abort 文件。如果下一次启动时存在 abort 文件。 说明 Broker 是异常退出的,CommitLog 与 ConsumeQueue 数据有可能不一致,需要进行修复。
  2. 加载延迟队列,RocketMQ 定时消息相关。
  3. 加载所有 CommitLog 文件,如果文件大小和配置单文件大小不一致则忽略,创建好了将wrotePosition、flushedPosition, committedPosition三个指针都指向文件结尾。后面的恢复过程会将这些指针修正。
  4. 加载消息 ConsumeQueue文件。与加载 CommitLog 大致相同。
  5. 加载存储检测点,检测点主要记录 commitLog 文件、ConsumeQueue 文件、Index 索引文件的刷盘点。
  6. 加载索引文件,如果上次异常退出,而且索引文件上次刷盘时间小于该索引文件最大的消息时间戳该文件将立即销毁。
  7. 根据 Broker 是否是正常停止执行不同的恢复策略,下文将分别介绍异常停止、正常停止的文件恢复机制。
  8. 恢复 ConsumeQueue 文件后,将在 CommitLog 实例中保存每个消息消费队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID 还存储了消息队列偏移量的关键所在。

Broker 正常停止

  1. Broker正常停止再重启时,从倒数第三个文件开始进行恢复,如果不足 3 个文件,则从第一个文件开始恢复。
  2. 从要恢复的 CommitLog 中,按照读到的消息大小读出消息正文,然后使用CRC(循环冗余校验)判断消息是否正确。
  3. 遍历 CommitLog 文件,每次取出一条消息,如果检查结果为 true 并且消息的长度大于 0 表示消息正确,校验指针移动到本条消息的末尾;如果查找结果为 true 并且消息的长度等于 0,表示已到该文件的末尾,如果还有下一个文件需要检查,则循环步骤3,否则跳出循环; 如果查找结构为 false,表明该文件未填满所有消息,跳出循环,结束遍历文件。
  4. 通过步骤 3,最终会得到一个校验通过的偏移 offset,通过它来更新 commit 指针和 flush 指针。
  5. 删除 offset 之后的所有文件。

正常停止的时,Broker 会将 IndexFile 和 ConsumeQueue 都更新好,所以如果 Broker 正常停止的话,恢复过程只是修正commit 指针和 flush 指针。

Broker 异常停止

异常文件恢复的步骤与正常停止文件恢复的流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果在消息消费队列 ConsumeQueue 目录下存在文件,则需要销毁。
如何判断一个 CommitLog 文件是正确的呢?

  1. 首先判断文件的魔数
  2. 如果文件中第一条消息的存储时间等于 0,则认为文件无效
  3. 对比文件第一条消息的时间戳与检测点,文件第一条消息的时间戳小于文件检测点 checkpoint 说明该文件部分消息是可靠的,则从该文件开始恢复。
  4. 如果根据前 3 步算法找到了合法的 CommitLog,则遍历 CommitLog 中的消息,验证消息的合法性,并将消息重新转发到消息消费队列与索引文件,这样会造成 ConsumeQueue 的冗余,这需要消息的消费者来实现幂等性。
  5. 如果步骤3未找到有效 CommitLog,则设置 CommitLog 目录的 flush 指针、 commit 指针都为 0,并销毁消息消费队列文件。

异常停止时,不确定 ConsumeQueue 和 IndexFile 是否正确,所以从最后一个有效文件,重新发送 CommitLog 变动事件,从而触发 ConsumeQueue 和 IndexFile 的更新。

我认为这里有问题,这里只重发最后一个有效文件的 CommitLog 变动事件,如果倒数第二个文件的最后几条改动事件还没有被处理时,创建了新的 CommitLog 文件(最后一个 CommitLog文件)并且成功写入了数据,并刷新了 checkpoint。那么倒数第二个 CommitLog 的最后几条消息就丢失了。为此,我在官方 Github 仓库提了一个 issue,但是目前未收到回复。

而且,如果最后一个 CommitLog 几乎写完,那么也会产生很多的冗余消息,我觉得这里可以从 ConsumeQueue 的最大索引处开始,顺序恢复所有有效的 CommitLog 内容。

刷盘机制

RocketMQ 的存储与读写是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。如果是同步刷盘,消息追加到内存后,将同步调用 MappedByteBuffer 的 force 方法;如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端。RocketMQ 使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在 broker 配置文件中配置 flushDiskType 来设定刷盘方式,可选值为 ASYNC_FLUSH (异步刷盘)、SYNC_FLUSH (同步刷盘),默认为异步刷盘。 ConsumeQueue、IndexFile 刷盘的实现相对于 CommitLog 刷盘机制来说都很简单,ConsumeQueue 是周期性刷盘,索引文件的刷盘并不是采取定时刷盘机制,而是每次想要更新一次索引文件就会将之前的改动刷写到磁盘。接下来我将主要介绍 CommitLog 的刷盘过程。

同步刷盘

同步刷盘,指的是在消息追加到内存映射文件的内存中后,立即将数据从内存刷写到磁盘文件,CommitLog 中有一个刷盘服务 GroupCommitService,所有消息发送线程接收到的同步写入请求,最终都会以请求-回应的方式通知 GroupCommitService 代其进行刷盘操作。当 GroupCommitService 执行完刷盘任务,或者刷盘任务执行超时时,发送线程才会回复消息的 Producer。

我觉得这里引入 GroupCommitService 的意义主要有如下几点:

  1. 避免锁竞争
  2. 抽象出的Request-Response模型,可以用来实现超时机制
  3. 避免无意义的刷新调用,每次刷盘都会刷新最新写入的所有数据,这样如果有实际已经被刷新过的请求过来,只要判断刷新指针就能快速知道是否已经完成
  4. 保证了刷盘的顺序

接下来我们看看 GroupCommitService 的核心内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 接收到的请求,直接写入requestsWrite
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
// 刷盘时,从requestsRead读取刷盘请求,有两个队列的意义是将读写过程的锁冲突消除,后面大家就会看到实际的操作过程
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
// 添加刷盘请求
public synchronized void putRequest(final GroupCommitRequest request) {
// 写锁
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 通知:有新的请求过来了
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
// 交换读写队列,依靠它完成读写锁冲突分离
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
// 实际刷盘过程
private void doCommit() {
// 读锁
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
// 线程循环
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
this.waitForRunning(10);
// 等待结束后,会交换读写队列
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}

异步刷盘

异步刷盘根据是否开启 transientStorePoolEnable 机制,刷盘实现会有细微差别。如果 transientStorePoolEnable 为 true, RocketMQ 会单独申请一个与目标物理文件 (CommitLog) 同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到 FileChannel 中,再 flush 到磁盘。 如果 transientStorePoolEnable 为 false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。

当 transientStorePoolEnable 为 true时,会有一个 CommitRealTimeService 默认每隔 200ms 将直接内存中的数据提交到 FileChannel,一次提交默认至少要包含 4 个页的数据,否则暂时不提交。当 transientStorePoolEnable 为 false 时,这个 CommitRealTimeService 实际上什么都没做。

然后是定时刷盘的逻辑,CommitLog 会有一个 FlushRealTimeService 定时将数据刷入磁盘,默认每隔 10s 进行一次刷盘,和 commit 过程一样,刷盘阶段默认也是至少攒够 4 个页的脏数据才进行刷盘,当 transientStorePoolEnable 为 true时,刷盘过程调用的是 FileChannel 的 force,否则调用的是 MappedByteBuffer 的 force。

过期删除机制

由于 RocketMQ 操作 CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载 CommitLog、ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引人一种机制来删除己过期的文件。 RocketMQ 顺序写 CommitLog 文件、ConsumeQueue 文件,所有写操作全部落在最后一个 CommitLog 或 ConsumeQueue 文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ 清除过期文件的方法是: 如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 72 小时 ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。

RocketMQ 会每隔 10s 调度一次清除过程,检测是否需要清除过期文件。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/rocketmq/store-message-file/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。