RocketMQ 消息消费

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息消费部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

消息消费

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

消息队列负载机制遵循一个通用的思想: 一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一主题的全局顺序消息消费,可以将该主题的队列数设置为 1,牺牲高可用性。

RocketMQ 支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。

消费者启动

  1. 构建主题订阅信息
    • 订阅目标topic
    • 订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为 %RETRY% + 消费组名。消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
  2. 初始化消息进度。如果消息消费是集群模式,那么消息进度保存在 Broker 上; 如果是广播模式,那么消息消费进度存储在消费端。
  3. 根据是否是顺序消费,创建消费端消费线程服务。ConsumeMessageService 主要负责消息消费,内部维护一个线程池。

消息拉取

我们会基于 PUSH 模型来介绍拉取机制,因为其内部包括了 PULL 模型。消息消费有两种模式:广播模式与集群模式,广播模式比较简单,每一个消费者需要去拉取订阅主题下所有消费队列的消息,接下来主要基于集群模式介绍。在集群模式下,同一个消费组内有多个消息消费者,同一个主题存在多个消费队列,消息队列负载,通常的做法是一个消息队列在同一时间只允许被一个消息消费者消费,一个消息消费者可以同时消费多个消息队列。

RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}

PullMessageService 从服务端拉取到消息后,会根据消息对应的消费组,转给该组对应的 ProcessQueue,而 ProcessQueue 是 MessageQueue 在消费端的重现、快照。 PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。

消息拉取分为 3 个主要步骤。

  1. 消息拉取客户端消息拉取请求封装。
  2. 消息服务器查找并返回消息。
  3. 消息拉取客户端处理返回的消息。

发送拉取请求

  1. 判断队列状态,如果不需要拉取则退出
  2. 进行消息拉取流控
    • 消息处理总数
    • 消息偏移量跨度
  3. 查询路由表,找到要发送的目标 Broker 服务器,如果没找到就更新路由信息
  4. 如果消息过滤模式为类过滤,则需要根据主题名称、broker地址找到注册在 Broker上的 FilterServer 地址,从 FilterServer 上拉取消息,否则从 Broker 上拉取消息
  5. 发送消息

Broker组装消息

  1. 根据订阅信息,构建消息过滤器
    • tag 过滤器只会过滤 tag 的 hashcode,为了追求高效率
    • SQL 过滤为了避免每次执行 SQL表达式,构建了 BloomFilter,在 Redis 防止缓存击穿那里我们也用过它
  2. 根据主题名称与队列编号获取消息消费队列
  3. 根据拉取消息偏移量,进行校对,如何偏移量不合法,则返回相应的错误码
  4. 如果待拉取偏移量大于 minOffset 并且小于 maxOffs 时,从当前 offset 处尝试拉取 32 条消息,根据消息队列偏移量(ConsumeQueue)从 CommitLog 文件中查找消息
  5. 根据 PullResult 填充 responseHeader 的 nextBeginOffset、 minOffset、 maxOffset
  6. 如果主 Broker 工作繁忙,会设置 flag 建议消费者下次从 Slave 节点拉取消息
  7. 如果 CommitLog 标记可用并且当前节点为主节点,则更新消息消费进度

Bloom Filter是一种空间效率很高的随机数据结构,它的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个位阵列(Bit array)中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检索元素一定不在;如果都是1,则被检索元素很可能在。这就是布隆过滤器的基本思想。
但Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

客户端处理消息

  1. 解码成消息列表,并进行消息过滤
    • 这里之所以还要进行过滤,是因为 Broker 为了追求效率只会根据 tag 的 hashcode 进行过滤,真实 key string 的对比,下放到 Consumer 上进行
  2. 更新 PullRequest 的下一次拉取偏移量,如果过滤后没有一条消息的话,则立即触发下次拉取
  3. 首先将拉取到的消息存入 ProcessQueue,然后将拉取到的消息提交到 ConsumeMessageService 中供消费者消费,该方法是一个异步方法,也就是 PullCallBack 将消息提交到 ConsumeMessageService 中就会立即返回
  4. 根据拉取延时,适时进行下一次拉取

pull-message
RocketMQ 并没有真正实现推模式,而是消费者主动向消息服务器拉取消息,RocketMQ 推模式是循环向消息服务端发送消息拉取请求,如果消息消费者向 RocketMQ 发送消息拉取时,消息并未到达消费队列,会根据配置产生不同效果:

  • 不启用长轮询机制:在服务端等待 shortPollingTimeMills=1s 时间后(挂起)再去判断消息是否已到达消息队列,如果消息未到达则提示消息拉取客户端 PULL_NOT_FOUND (消息不存在)
  • 开启长轮询模式: RocketMQ 一方面会每 5s 轮询检查一次消息是否存在,同时一有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,如果是, 则从 CommitLog 文件提取消息返回给消息拉取客户端,否则等到挂起超时,超时时间由消息拉取方在消息拉取时封装在请求参数中,PUSH 模式默认为 15s

当新消息达到 CommitLog 时,ReputMessageService 线程负责将消息转发给 ConsumeQueue、IndexFile,如果 Broker 端开启了长轮询模式并且角色主节点,则最终将调用 PullRequestHoldService 线程的 notifyMessageArriving 方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息拉取能实现准实时。

队列负载均衡

在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 Pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。

在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 ConsumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 ChannelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

Consumer 的 RebalanceService 会每隔20s执行一次负载均衡。它会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。因为广播模式,每个 Consumer 都会订阅所有队列的内容,实现很简单,所以这里主要来看下集群模式下的主要处理流程:

  1. 从本地缓存变量 TopicSubscribeInfoTable 中,获取该 Topic 主题下的消息消费队列集合
  2. 向各个 Broker 端发送获取该消费组下消费者Id列表
  3. 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列
  4. 根据计算出来的新负载均衡结果,更新本地的队列消费任务
    • 删除已经不由自己负责的队列消费任务
    • 添加新的由自己负责的队列消费任务,从 Broker 中读取该队列的消费偏移 Offset,然后开始消费任务

RocketMQ 的负载均衡过程并没有通过选主分配的过程进行,而是各个节点自行计算,我觉得主要是为了实现方便,而且 RocketMQ 也不追求一个消息只被消费一次,如果负载均衡的结果出现了短暂冲突(最终应该会趋于一致),也可以靠 Consumer 实现幂等性解决。

消息消费过程

  1. 当 Consumer 拉取收新的消息时,会将这些消息以 32 个为一组,提交给消息消费者线程池
  2. 线程池进行实际消费时,会确认当前消息队列是否仍然归自己管辖(重新负载均衡时,将该队列分配给了别的消费者)
  3. 恢复延时消息主题名
    • RocketMQ 将消息存入 CommitLog 文件时,如果发现消息是延时消息,会首先将原主题存入在消息的属性中,然后设置主题名称为 SCHEDULE_TOPIC,以便时间到后重新参与消息消费。
  4. 执行具体的消息消费函数,最终将返回 CONSUME_SUCCESS (消费成功)或 RECONSUME_LATER (需要重新消费)
  5. 如果业务代码返回 RECONSUME_LATER,根据模式作出不同的处理
    • 广播模式:什么都不处理,只打印log
    • 集群模式:回发消费失败的消息,进行重新消费,如果发送失败,则再次尝试自己消费
  6. 根据消费成功的消息,计算消费者线程池的剩余消息数量和大小,然后更新offset
    • 由于可能会出现一组消息只有后半段被消费成功的情况,所以最终的 offset 为剩余消息池中最小的 offset,这就势必会出现重复消费

重试消息

RocketMQ 提供了几个重试消息的延时级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 1Om 20m 30m 1h 2h,同时也有消息最大重新消费次数,如果超过了最大重新消费次数则会被单独存储起来,等待人工处理。

重试消息会被存入名为”%RETRY%+消费组名称”的主题中,原始主题会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

消费进度管理

  • 广播模式: 同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。这些数据最终会存储在 Consumer 节点的磁盘文件中,采用周期性刷盘的形式存储。
  • 集群模式: 同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在一个每个消费者都能访问到的地方————Broker,在需要更新 Offset 时,会以网络请求的形式更新 Broker 中存储的 Offset。

change-broker-offset
消费者线程池每处理完一个消息消费任务(ConsumeRequest)时会从 ProcessQueue 中移除本批消费的消息,并返回 ProcessQueue 中最小的偏移量,用该偏移量更新消息队列消费进度,如果 ProcessQueue 中的消息 Offset 分别为 [10,30,40,50],这时候消费了30,40,最后的 Offset 仍然为 10。只有当 Offset = 10 的消息被消费后,Offset 才会变为 50。正因为如此,RocketMQ 才会有根据消息 Offset 跨度进行流量控制的功能。

此外,值得一提的是,当发生重新负载均衡后,如果某一队列被分配给了其他消费者,那么该队列对应的 Offset 也会从本机中消除。

顺序消息

RocketMQ 支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费,如果需要做到全局顺序消费则可以将主题配置成一个队列。

消息队列负载

如果经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向 Broker 发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。如果重新分配后,发现某一队列已不由自己负责,会主动的释放该队列的锁。除此之外,锁的最大存活时间是 60s,如果超过 60s 未续锁,则自动释放。

顺序消息消费与并发消息消费的第一个关键区别: 顺序消息在创建消息队列拉取任务时需要在 Broker 服务器锁定该消息队列。

消息拉取方式

消息拉取过程中,先会判断该消息队列是否被锁定,如果未被自己锁定,则会延迟一段时间后,再进行拉取任务。

消息消费方式

如果消费模式为集群模式,启动定时任务,默认每隔 20s 锁定一次分配给自己的消息消费队列(锁的保活)。

在 ConsumeMessageOrderlyService 消费消息时,先会获取内存中的队列锁。也就是说,一个消息消费队列同一时刻只会被一个消费线程池中一个线程消费。除此之外,其他过程基本和并发消费的过程一致。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/rocketmq/consume-message/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。