RocketMQ 常见问题

引言

至此,我们已经介绍完了 RocketMQ 的所有实现细节,最后我们简单地介绍一下使用 RocketMQ 时常见的问题,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

常见问题

顺序消息方案

顺序消息是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这 3 个消息必须按顺序处理才行。顺序消息分为全局顺序消息和部分顺序消息,全局顺序消息指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可,比如上面订单消息的例子,只要保证同一个订单 ID 的三个消息能按顺序消费即可。

  • 全局顺序消息:一个 topic 只留一个队列,并且顺序消费,这时高并发、高吞吐量的功能完全用不上了。
  • 部分顺序消息:在发送端,要做到把同一业务 ID 的消息发送到同一个 Message Queue(hash 映射);在消费过程中,要做到从同一个 Message Queue 读取的消息不被并发处理,Consumer 使用 MessageListenerOrderly 的时候,这样才能达到部分有序。

消息重复问题

解决消息重复有两种方法:第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同);另一种方法是维护一个巳消费消息的记录(例如,和实际消费过程在同一个事务中写入 DB),消费前查询这个消息是否被消费过。这两种方法都需要使用者自己实现。

动态增减机器

NameServer 增删

NameServer 是各自独立的,在添加或删除 NameServer 后,只能能通过一些机制更新其他角色的 NameServer 地址列表即可:

  • 通过代码设置,比如Producer.setNamesrvAddr (”name-server1-ip:port;name-server2即:port”
  • 使用 Java 启动参数设置
  • 通过 Linux 环境变量设置
  • 使用者定时通过 HTTP 服务查询最新的 NameServer 列表

Broker 增删

由于业务增长,需要对集群进行扩容的时候,可以动态增加 Broker 角色的机器。只增加 Broker 不会对原有的 Topic 产生影响,原来创建好的 Topic 中数据的读写依然在原来的那些 Broker 上进行。

减少 Broker 要看是否有持续运行的 Producer,当一个 Topic 只有一个 Master Broker,停掉这个 Broker 后,消息的发送肯定会受到影响,需要在停止这个 Broker 前,停止发送消息。

当某个 Topic 有多个 Master Broker,停了其中一个,这时候是否会丢失消息呢? 答案和 Producer 使用的发送消息的方式有关,如果使用同步方式 send (msg)发送,在 DefaultMQProducer 内部有个自动重试逻辑,其中一个 Broker 停了,会自动向另一个 Broker 发消息,不会发生丢消息现象。 如果使用异步方式发送 send (msg, callback),或者用 sendOneWay 方式,会丢失切换过程中的消息。 因为在异步和 sendOneWay 这两种发送方式下,发送失败不会重试。DefaultMQProducer 默认每 30 秒到 NameServer 请求最新的路由消息,Producer 如果获取不到已停止的 Broker 下的队列信息,后续就自动不再向这些队列发送消息。

如果 Producer 程序能够暂停,在有一个 Master 和一个 Slave 的情况下也可以顺利切换。可以关闭 Producer 后关闭 Master Broker,这个时候所有的读取都会被定向到 Slave 机器,消费消息不受影响。把 MasterBroker 机器置换完后,基于原来的数据启动这个 Master Broker,然后再启动 Producer 程序正常发送消息。

用 Linux 的 kill pid 命令就可以正确地关闭 Broker, BrokerController 下有个 shutdown 函数,这个函数被加到了 ShutdownHook 里,当用 Linux 的 kill 命令时(不能用 kill -9 ), shutdown 函数会先被执行。也可以通过 RocketMQ 提供的工具(mqshutdown broker)来关闭 Broker,它们的原理是一样的。

各种故障的影响

我们期望消息队列集群一直可靠稳定地运行,但有时候故障是难免的,我们列出可能的故障情况,看看如何处理:

  1. Broker 正常关闭,启动
  2. Broker 异常 Crash,然后启动
  3. OS Crash,重启
  4. 机器断电,但能马上恢复供电
  5. 磁盘损坏
  6. CPU、主板、内存等关键设备损坏

假设现有的 RocketMQ 集群,每个 Topic 都配有多 Master 角色的 Broker 供写入,并且每个 Master 都至少有一个 Slave 机器(用两台物理机就可以实现上述配置),我们来看看在上述情况下消息的可靠性情况。

第 1 种情况属于可控的软件问题,内存中的数据不会丢失。如果重启过程中有持续运行的 Consumer, Master机器出故障后,Consumer会自动重连到对应的 Slave 机器,不会有消息丢失和偏差。当 Master 角色的机器重启以后,Consumer又会重新连接到 Master 机器(注意在启动 Master 机器的时候,如果 Consumer 正在从 Slave 消费消息,不要停止 Consumer。 假如此时先停止 Consumer 后再启动 Master机器,然后再启动 Consumer,这个时候 Consumer 就会去读 Master 机器上已经滞后的 offset 值,还记得集群模式 offset 存在 Master Broker 吗,这样造成消息大量重复)。

如果第 1 种情况出现时有持续运行的 Producer,一台 Master 出故障后,Producer 只能向 Topic 下其他的 Master 机器发送消息,如果 Producer 采用同步发送方式,不会有消息丢失。

第2、3、4种情况属于软件故障,内存的数据可能丢失,所以刷盘策略不同,造成的影响也不同,如果 Master、Slave 都配置成 SYNC_FLUSH,可以达到和第 1 种情况相同的效果。

第 5、6 种情况属于硬件故障,发生第 5、6 种情况的故障,原有机器的磁盘数据可能会丢失。如果 Master 和 Slave 机器间配置成同步复制方式,某一台机器发生 5 或 6 的故障,也可以达到消息不丢失的效果。如果机器间是异步复制,两次 Sync 间的消息会丢失。

总的来说,当设置成:

  • 多 Master,每个 Master 带有 Slave
  • 主从之间设置成 SYNC_MASTER
  • Producer 用同步方式写
  • 刷盘策略设置成 SYNC_FLUSH

就可以消除单点依赖,即使某台机器出现极端故障也不会丢消息。

消息优先级

RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级。业务中简单的优先级需求,可以通过间接的方式解决:

  • 拆分出多个 topic 分配不同的 Consumer 去消费
  • 一个 topic 拆分多个队列,每个队列分别消费
  • 强优先级:拆分多个 topic A,B,C,消费者在 A 消费完的情况下才会消费 B,同样 B 消费完的情况下,才去消费 C

吞吐优先方案

  • 过滤感兴趣的消息,减少网络传输 TAG、SQL、FilterServer
  • 增加 Consumer 并发度
  • 检测延时情况,跳过非重要消息
  • 增加 topic 队列
  • 并发 Producer
  • 推荐使用 EXT4 文件系统,IO 调度算法使用 deadline算法

file-system

deadline算法大致思想如下: 实现四个队列,其中两个处理正常的 read 和 write 操作,另外两个处理超时的 read 和 write 操作。正常的 read 和 write 队列中,元素按扇区号排序,进行正常的 IO 合并处理以提高吞吐量。因为 IO 请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置的 IO 请求被饿死。超时的 read 和 write 的队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的 IO 请求会优先被处理。

调优

这里讨论的系统是指能完成某项功能的软硬件整体,比如我们用 RocketMQ ,加上自己写的 Producer、Consumer 程序,部署到一台服务器上,组成一个消息处理系统。

首先是搭建测试环境,查看硬件利用率。把测试系统搭建好以后,要想办法模拟实际使用时的情况,并且逐步增大请求量,同时检测系统的 TPS。在请求量增大到一定程度时,系统的 QPS 达到峰值,这个时候维持这种请求量,保持系统在峰值状态下运行。然后查看此时系统的硬件使用情况:

  • 使用 top 命令查看 CPU 和内存的利用率
  • 使用 sar -n DEV 2 10 命令查看网卡使用情况
  • 使用 netstat -t 查看网卡的连接情况, 看是否有大量连接造成堵塞
  • 使用 iostat -xdm 1 查看磁盘的使用情况

经过上面的一系列检查,应该能够找到系统的瓶颈。比如瓶颈是在CPU、网卡还是磁盘?

还有一种情况是这三者都没有到使用极限,这也是一种比较常见而且有优化空间的情况,这种情况说明 CPU 利用率没有发挥出来,比如可能是锁的机制有 bug,造成线程阻塞。

对于 Java 程序来说,接下来可以用 Java 的 profiling 工具来找出程序的具体问题,比如 jvisualvm、 jstack、 perfJ 等。通过上面这些工具,可以逐步定位出是哪些 Java 线程比较慢,那个函数占用的时间多,是否因为存在锁造成了忙等的情况,然后通过不断的更改测试,找到影响性能的关键代码,最终解决问题。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/rocketmq/usage-issue/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。