RocketMQ相关问题
| 版本 | 内容 | 时间 |
|---|---|---|
| V1 | 新建 | 2025年04月21日22:30:51 |
RocketMQ 各个角色

Producer
- 消息发布的角色,支持分布式集群方式部署。
- Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟;
- 支持消息重试。
Consumer
- 支持分布式集群方式部署;
- 支持以 push 推,pull 拉两种模式对消息进行消费;
- 同时也支持集群方式和广播方式的消费;
NameServer
NameServer 是一个非常简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:
Broker 管理:NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
路由信息管理:每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费;
NameServer 几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息, Producer 和 Consumer 仍然可以动态感知 Broker 的路由的信息;
BrokerServer
Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要子模块。
- Remoting Module:整个 Broker 的实体,负责处理来自 Client 端的请求;
- Client Manager:负责管理客户端( Producer / Consumer )和维护 Consumer 的 Topic 订阅信息;
- Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能;
- HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能;
- Index Service:根据特定的 Message key 对投递到Broker的消息进行索引服务,以提供消息的快速查询;
Broker 的部署:在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。**Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId为 0 表示 Master,非 0 表示 Slave。**Master 也可以部署多个。
RocketMQ 的 Reactor 多线程模型
RocketMQ 的 Reactor 多线程模型是其网络通信模块中采用的一种高效的异步事件驱动模型,主要用于处理客户端与 Broker 之间的网络连接和消息读写等操作,具有高性能、高可扩展性等优点。以下是其具体介绍:
- Main Reactor 线程:主要负责接收客户端的连接请求,将新连接分配给 Sub Reactor 线程,并处理一些与连接建立和销毁相关的逻辑。在 RocketMQ 中,通常只有一个 Main Reactor 线程,它会在 Broker 启动时创建并开始监听指定端口,等待客户端连接。
- Sub Reactor 线程池:由多个 Sub Reactor 线程组成,每个 Sub Reactor 线程负责处理一部分连接上的读写事件。Sub Reactor 线程从 Main Reactor 线程接收分配的连接后,会注册该连接的读事件监听,当有数据可读时,Sub Reactor 线程会读取数据并进行相应的处理,如解析消息、执行请求等。处理完成后,如果需要发送响应数据,Sub Reactor 线程会将写事件注册到事件循环中,等待合适的时机将数据发送给客户端。
- 处理器 ChannelHandler 的线程:在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理这些操作,这些处理器由特有的线程池的线程处理;
- 业务线程池:处理业务的线程在特有的业务线程池中执行。每种类型的请求都有自己的 code,每个 code 都会有特定的 processor 去处理,会把请求逻辑封装成一个 Runnable 的任务,提交给业务线程池去处理。这样可以避免 Sub Reactor 线程被长时间阻塞,保证其能够及时处理其他连接上的事件,提高系统的并发处理能力

优点
- 提高并发处理能力:通过多个 Sub Reactor 线程和工作线程池的配合,能够同时处理大量的客户端连接和请求,充分利用多核处理器的性能,提高系统的并发处理能力和吞吐量。
- 降低线程切换开销:Reactor 模型基于事件驱动,只有在有事件发生时才会触发相应的处理逻辑,避免了传统多线程模型中大量的线程切换开销,提高了系统的性能和响应速度。
- 增强可扩展性:可以方便地通过增加 Sub Reactor 线程或工作线程的数量来扩展系统的处理能力,以适应不同的业务负载需求。同时,各个线程之间的职责明确,便于代码的维护和扩展。
- 提高资源利用率:Sub Reactor 线程和工作线程可以根据实际的业务负载动态调整其工作状态,避免了线程的闲置和浪费,提高了系统资源的利用率。
RocketMQ 的路由信息交互

- Broker 注册:Broker 启动后,会向所有的 NameServer 发送注册请求,将自己的信息(如 Broker 名称、IP 地址、端口号、所属集群等)以及所负责的 Topic 信息注册到所有 NameServer 中。NameServer 会保存这些信息,并通过心跳机制来检测 Broker 的存活状态。(120 秒 broker 未上报心跳包,则认为 broker 宕机)
- Producer 获取路由信息:生产者和 NameServer 建立长连接,每 30 秒从 NameServer 中获取路由信息,然后再自己本地保存。包括该 Topic 分布在哪些 Broker 上,以及每个 Broker 上的队列信息等。Producer 会根据这些路由信息选择合适的 Broker 队列进行消息投递。
- Consumer 获取路由信息:消费者和 NameServer 建立长连接,每 30 秒从 NameServer 中获取路由信息,然后再自己本地保存,以了解该 Topic 的消息分布在哪些 Broker 上,从而确定从哪些 Broker 上拉取消息。
RocketMQ 的服务端和客户端
RocketMQ 中有四种角色,各个角色内部对应 Netty 服务端和客户端的角色如下:
- NameServer:服务端,因为 Broker 需要和 NameServer 维护心跳,Producer 和 Consumer 都需要从 Nameserver 获取路由信息;
- Broker(master 和 salve):服务端和客户端。和 NameServer 维护心跳时作为客户端;在与 Producer 和 Consumer 交互时是作为服务端的;
- Producer:客户端。从 NameServer 拉取路由信息;发送消息到 Broker;
- Consumer:客户端。从 NameServer 拉取路由信息;从 Broker 拉取消息;
broker是如何注册到namesrv?如何保活的?怎么认为broker宕机?
Broker 注册到 NameSrv 的过程
- 发送注册请求:Broker 启动后,会向集群中的所有 NameSrv 实例发送注册请求。请求中包含 Broker 的地址(IP 和端口)、BrokerId、Broker 名称、所属集群名称、主节点地址(如果是从节点)、消息过滤服务器列表、Topic 配置等信息4。
- 获取写锁并更新数据结构:NameSrv 收到注册请求后,会使用可重入写锁来确保在更新内部数据结构时的同步控制。从相关表中查找给定集群名对应的 Broker 集合,若不存在则创建。将当前注册的 Broker 名称加入集合,获取或初始化 BrokerData 对象来存储 Broker 集群、名称及地址映射信息,更新 Broker 地址映射表,并在 brokerLiveTable 中保存 Broker 的实时连接信息,包括当前时间戳、主题配置版本、通道以及高可用服务器地址。
Broker 在 NameSrv 的保活机制
Broker 通过定时发送心跳包来保持在 NameSrv 中的存活状态。Broker 启动后,会每隔 30 秒向集群中所有的 NameSrv 发送心跳包。心跳包的 header 中保存当前 Broker 的信息,body 保存 Topic 信息。NameSrv 收到 Broker 心跳时,会更新 brokerLiveTable 中对应 Broker 的最后更新时间戳等信息。
NameSrv 判断 Broker 宕机的方式
NameSrvController 初始化时会启动定时线程池,其中包括一个每隔 10 秒检测一次不健康 Broker 的 schedule 线程池。在定时任务中,NameSrv 会遍历 brokerLiveTable,如果当前时间大于 Broker 最后更新时间加上超时时间(默认是 120 秒),就认为 Broker 宕机。NameSrv 会主动关闭与该 Broker 的 channel,并发送 Broker 注销请求,将其从路由信息中剔除。
另外,如果 Broker 主动下线,则会
- NameSrv 感知与更新:Broker 在关机前会向 NameSrv 发送注销请求,告知 NameSrv 自己即将下线。NameSrv 收到请求后,会更新路由信息,将该 Broker 从可用 Broker 列表中移除,并向 Producer 和 Consumer 发送路由信息变更通知。如果 Broker 异常关机,没有及时发送注销请求,NameSrv 会通过心跳检测机制发现 Broker 宕机,也会更新路由信息。
- Producer 和 Consumer 调整:当某个主题的路由信息发生改变了,NameServer 不会主动推送新的路由信息到客户端。客户端会定时的从 NameServer 中拉取最新的路由信息。这样设计的目的是为了降低 NameServer 的实现复杂性,避免在路由信息变更时需要维护大量的推送连接和状态
生产者消费者如何获取的路由信息
与 NameServer 建立长连接
- Producer 和 Consumer 启动时,会根据配置的 NameServer 地址列表,与 NameServer 集群中的节点建立长连接。这个连接用于后续的路由信息获取以及与 NameServer 的其他交互。
定时拉取路由信息
- Producer 和 Consumer 会定时从 NameServer 拉取路由信息。以 Producer 为例,在发送第一条消息时,会根据 Topic 从 NameServer 获取路由信息,包括 Topic 下面有哪些 Queue,这些 Queue 分布在哪些 Broker 上等。之后,Producer 和 Consumer 会按照一定的时间间隔(默认 30 秒),周期性地从 NameServer 拉取最新的路由信息,以保证获取到最新的 Broker 状态和 Topic 路由变化。例如,Consumer 在订阅某个主题的消息之前,会从 NameServer 获取 Broker 服务器地址列表。
本地缓存路由信息
- Producer 和 Consumer 获取到路由信息后,会将其缓存到本地。这样在后续发送或消费消息时,可以直接从本地缓存中获取 Broker 地址等信息,提高消息处理的效率。当本地缓存的路由信息过期或检测到路由变化时,再从 NameServer 拉取新的路由信息进行更新。
消息的类型
RocketMQ 消息发送在发送方式上分为三种
- 同步发送;
- 异步发送;
- 单向发送;
RocketMQ 在发送消息的类型分为以下几种
- 普通消息;
- 顺序消息;
- 延迟消息;
- 事务消息;
- 单向消息;
- 批量消息;
消息发送的 Broker 故障规避机制
Broker 故障规避机制与生产者感知延迟的关系
- 由于生产者是每隔 30 秒去 NameServer 拉取 topic 路由管理信息,而 NameServer 不会主动推送路由变化信息,所以当 Broker 出现故障时,生产者不能立即感知到,这就导致了生产者感知 topic 路由信息变化存在延迟。在这段延迟时间内,生产者可能会继续向故障的 Broker 发送消息,从而导致消息发送失败。因此,为了减少这种延迟带来的影响,Broker 的故障规避机制就显得尤为重要,它可以在一定程度上降低因 Broker 故障而导致消息发送失败的概率。
消息级别的故障规避机制
- 重试策略:在 RocketMQ 中,无论是同步消息还是异步消息,当发送失败时都会进行重试。这是一种常见的容错机制,有助于提高消息发送的成功率。
- 避免向故障 Broker 重复发送:在重试时,默认情况下不会再往发送失败的 Broker 发送消息,除非其他 Broker 上没有该 topic 所在的队列。这样可以避免持续向故障的 Broker 发送消息,减少消息发送的阻塞和失败率,提高整个系统的可靠性和稳定性。
生产者级别的故障规避机制
- 配置参数:将
sendLatencyFaultEnable配置设为true后,生产者会根据消息发送消耗的时间来判断 Broker 的可用性。 - Broker 可用性判断:设定一个时间阈值 A,当消息发送时间小于 A 时,认为 Broker 正常;当消息发送时间大于 A 时,则认为 Broker 可能存在问题。对于被认为有问题的 Broker,在一段时间内不允许向其发送消息,这段时间的长短通常根据具体的业务需求和系统情况来设定。这种机制可以让生产者动态地感知 Broker 的性能变化,及时避开性能不佳的 Broker,从而提高消息发送的效率和成功率。
broker 的存储目录
store
├── abort
├── checkpoint
├── commitlog
│ └── 00000000000000000000
├── config
│ ├── consumerFilter.json
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json
│ ├── subscriptionGroup.json.bak
│ ├── topics.json
│ └── topics.json.bak
├── consumequeue
│ ├── ScheduledTopic
│ │ ├── 0
│ │ │ └── 00000000000000000000
│ │ ├── 1
│ │ │ └── 00000000000000000000
│ │ ├── 2
│ │ │ └── 00000000000000000000
│ │ └── 3
│ │ └── 00000000000000000000
│ ├── SCHEDULE_TOPIC_XXXX
│ │ ├── 1
│ │ │ └── 00000000000000000000
│ │ └── 2
│ │ └── 00000000000000000000
│ ├── TopicTest
│ │ ├── 0
│ │ │ └── 00000000000000000000
│ │ ├── 1
│ │ │ └── 00000000000000000000
│ │ ├── 2
│ │ │ └── 00000000000000000000
│ │ └── 3
│ │ └── 00000000000000000000
│ └── TopicTest2
│ ├── 0
│ │ └── 00000000000000000000
│ ├── 1
│ │ └── 00000000000000000000
│ ├── 2
│ │ └── 00000000000000000000
│ └── 3
│ └── 00000000000000000000
├── index
│ └── 20220411232202751
└── lockRocketMQ 中 Broker 的存储目录默认是${ROCKRTMQ_HOME}/store3。主要包含以下子目录5:
- commitlog:消息的存储目录,所有消息都会顺序写入该目录下的文件中,文件名由该文件保存消息的最大物理偏移值在高位补 0 组成,每个文件大小默认 1GB,可以通过
mapedFileSizeCommitLog进行配置。 - consumequeue:消息消费队列存储目录,包含该 Broker 上所有的 Topic 对应的消费队列文件信息,格式为 “
./consumequeue/Topic名字/queue id/具体消费队列文件”。每个消费队列是 commitlog 的一个索引,用于消费者拉取消息、更新位点。因为 commitlog 是顺序写的,假如没有索引的话需要从头开启遍历查找消息,比较耗时; - index:消息索引文件存储目录,全部文件都是按照消息 key 创建的 Hash 索引,为通过 key 或时间区间查询消息提供支持。文件名是创建的时间戳命名的
- config:保存当前 Broker 中全部的 Topic 的路由信息、订阅关系和消费进度等数据,这些数据会定时从内存持久化到磁盘,以便宕机后恢复。
- consumerFilter.json:主题消息过滤信息;
- consumerOffset.json:集群消费模式下的消息消费进度;
- delayOffset.json:延迟消息队列拉取进度;
- subscriptionGroup.json:消息消费组的配置信息;
- topics.json:topic 配置属性(主题路由信息);
- lock:存储模块中的加锁文件,作为锁对象;
- abort:标记 broker 是否是异常关闭。正常关闭 broker 时会删除这个文件,异常关闭时不会删除这个文件。当 broker 重启时,根据是否存在这个文件决定是否重新构建 index 索引等操作;
- checkpoint:存储着 commitLog、consumerqueue、index 文件的最后刷盘时间戳,文件固定长度 4k,只用了前 24 个字节。
RocketMQ 的存储架构

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。CommitLog文件、ConsumeQueue文件、Index文件:
(1) CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G = 1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824 ,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
因为 CommitLog 的文件是存储在磁盘上的,为了提高写入消息的性能,RocketMQ 使用了 mmap 将磁盘文件映射到内存中。映射到的内存到 page cache,现代的操作系统内核被设计为按照 Page 读取文件,每个 Page 的默认大小是 4kb,如果读取的内容名字了 page cache 就直接返回,不会再次读取磁盘了。但是 page cache 也有脏页回写、内存回收、内存置换等情况,RocketMQ 通过内存预热、设定内存不置换等措施来优化。
TODO-KWOK 上图,那本书上有图,抄一个即可
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件,根据 topic 检索消息是非常低效的。Consumer 可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,存储 tag 的哈希码的原因是为了保证每个条目的长度一致,可以使用类似数组下标快速访问条目。单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;
消息消费者根据 topic、消息消费进度(ConsumeQueue 逻辑偏移量),也就是第几个 ConsumeQueue 条目,类似数组的索引,这样的消费进度去访问消息,通过逻辑偏移量 logicOffset × 20,即可找到该条目的起始偏移量(ConsumeQueue 文件中的偏移量),然后读取该偏移量后 20 个字节即可得到一个条目,无须遍历 ConsumeQueue 文件。

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法,主要存储消息 key 与 offset 的对应关系。Index 文件的存储位置是:$HOME/store/index/{fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 hash 索引。
TODO-KWOK 上图,那本书上有图,抄一个即可
在上面的 RocketMQ 的消息存储整体架构图中可以看出,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog )来存储。RocketMQ 的混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程 — ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。
TODO-KWOK consumerQueue 和 index 文件的生成时机
RocketMQ 针对存储做的优化
顺序写入
- 原理:RocketMQ 的 CommitLog 采用顺序写入的方式存储消息。在底层,它通过
MappedFileQueue来管理一系列的MappedFile,消息会被顺序地写入到MappedFile中。 - 优势:顺序写入大大减少了磁盘寻道时间,提升了写入性能。与随机写入相比,顺序写入可以充分利用磁盘的顺序读写特性,提高磁盘 I/O 的效率,从而能够支持高并发的消息写入。
内存映射文件(Memory - Mapped File)
- 原理:RocketMQ 使用内存映射文件来实现文件的读写操作。通过
MappedByteBuffer将文件映射到内存地址空间,这样可以直接在内存中对文件进行操作,而不需要通过传统的read()和write()系统调用。 - 优势:一方面,内存映射文件减少了数据在用户空间和内核空间之间的拷贝次数,提高了数据读写的效率。另一方面,操作系统会自动对内存映射文件进行缓存管理,将最近访问过的页面缓存在内存中,进一步加速了文件的访问。
零拷贝(Zero - Copy)
- 原理:在消息发送和消费过程中,RocketMQ 利用了零拷贝技术。例如,在发送消息时,通过
FileChannel的transferTo()方法将文件内容直接发送到网络通道,而不需要将数据先拷贝到用户空间的缓冲区。 - 优势:零拷贝技术避免了数据的冗余拷贝,减少了 CPU 和内存的开销,提高了消息传输的性能和效率,尤其是在处理大文件或高并发的消息传输场景下,效果更为显著。
零拷贝是一种优化技术,旨在减少数据在计算机内存中复制的次数,从而提高 I/O 操作的效率和性能2。以下是对 RocketMQ 中零拷贝技术的介绍:
零拷贝原理
- 传统的数据传输,如从磁盘读取数据并通过网络发送,数据要从磁盘读到内核缓冲区,再从内核缓冲区拷贝到用户态缓冲区,接着又拷贝回内核态以便发送到网络,涉及多次数据拷贝和上下文切换。而零拷贝通过利用
mmap、sendfile等系统调用,避免了从内核到用户空间的数据复制,减少了 CPU 和内存带宽的消耗。
RocketMQ 中的零拷贝实现
- RocketMQ 采用内存映射(
mmap)的方式实现零拷贝。mmap将磁盘文件映射到进程的内存地址空间中,这样通过访问内存就可以直接对文件进行操作,而不需要显式的 I/O 操作。在 RocketMQ 中,消息被持久化到磁盘后,通过mmap映射到内存中。当有消费者需要读取消息时,RocketMQ 可以直接从内存映射区域中读取消息数据,避免了将数据从内核态拷贝到用户态的过程。不过在发送数据到网络时,RocketMQ 没有像 Kafka 那样直接使用sendfile,因此其零拷贝主要体现在文件读写阶段,网络传输上仍存在数据拷贝操作。
零拷贝技术的优势
- 性能提升:减少了 CPU 使用,提高 I/O 操作的性能。
- 降低延迟:减少了数据复制的延迟,从而使数据传输更加高效。
- 减小内存带宽压力:减少了内存数据传输,提高系统整体的吞吐量。
消息索引优化
- 原理:RocketMQ 构建了多种索引结构来加速消息的查询和消费。除了常规的 ConsumeQueue 作为消息消费队列的索引外,还提供了 IndexFile 用于根据消息的 key 或时间范围来查询消息。IndexFile 采用了哈希索引和跳跃表等数据结构来实现快速的索引查找。
- 优势:通过这些索引结构,消费者可以快速定位到需要消费的消息,提高了消息的拉取速度和消费效率。同时,索引的存在也使得在进行消息查询和回溯时更加高效,能够满足不同业务场景下对消息快速定位的需求。
刷盘策略优化
- 原理:RocketMQ 提供了同步刷盘和异步刷盘两种策略,并且在源码层面进行了精细的控制和优化。同步刷盘通过
GroupCommitService线程来确保消息及时、可靠地刷写到磁盘;异步刷盘则通过FlushRealTimeService线程按照一定的时间间隔或消息积累量来批量刷盘。 - 优势:这种设计允许用户根据业务对消息可靠性和性能的要求进行灵活配置。同步刷盘保证了消息的强一致性和可靠性,但会降低一定的性能;异步刷盘则在保证一定可靠性的前提下,通过批量操作提高了刷盘的性能和效率,满足了不同业务场景下的需求。
存储文件预分配
- 原理:在 RocketMQ 启动时,会预先分配 CommitLog 和 ConsumeQueue 等存储文件的空间。通过
MappedFile的构造函数,会在指定的存储目录下创建指定大小的文件,并将其映射到内存中。 - 优势:这样避免了在运行过程中频繁地创建和扩展文件,减少了文件系统的碎片,提高了文件的读写性能。同时,预分配空间也使得文件的存储位置相对连续,有利于提高磁盘 I/O 的效率。
PageCache
页缓存(PageCache)机制概述
页缓存(PageCache)是操作系统对文件的缓存机制,其目的在于加速文件的读写操作。在该机制下,操作系统会将一部分内存用作 PageCache。一般而言,程序对文件进行顺序读写的速度近乎于内存的读写速度,这主要得益于操作系统借助 PageCache 机制对读写访问操作进行了性能优化。
PageCache 的读写原理
对于数据写入,操作系统不会直接将数据写入物理磁盘,而是先将数据写入至 PageCache 内,随后再由 pdflush 内核线程以异步的方式将 PageCache 内的数据刷盘至物理磁盘上。而在数据读取方面,若一次读取文件时未命中 PageCache,操作系统在从物理磁盘上访问读取文件的同时,会对其他相邻块的数据文件进行顺序预读取,以便后续的读取操作能够更快地命中缓存。每个 Page 的默认大小为 4KB。
PageCache 的局限性
PageCache 虽然带来了显著的性能提升,但也存在一定缺点。当操作系统执行脏页回写、内存回收、内存交换等操作时,会引发较大的消息读写延迟。因为这些操作可能会占用系统资源,影响数据在 PageCache 与物理磁盘之间的传输,进而影响到依赖 PageCache 进行读写的应用程序的性能。
RocketMQ 对 PageCache 的写入性能优化策略
在 RocketMQ 中,Broker 在处理消息存储时,充分利用了 PageCache 的特性。**当 Broker 将数据写入 CommitLog 文件时,数据并非直接写入底层的物理磁盘文件,而是先进入操作系统的 PageCache 内存缓存中。后续,由操作系统的后台线程异步将 PageCache 中的数据刷入底层的磁盘文件中。**这种磁盘文件顺序写 + OS PageCache 写入 + OS 异步刷盘的策略,使得 RocketMQ 在消息写入性能上表现出色。一方面,顺序写减少了磁盘寻道时间,提高了写入效率;另一方面,PageCache 的缓存作用进一步减少了磁盘 I/O 次数,异步刷盘则在保证数据最终持久化的同时,避免了同步刷盘带来的性能损耗。
PageCache 对 RocketMQ ConsumeQueue 的性能加持
在 RocketMQ 的架构中,ConsumeQueue 作为逻辑消费队列,存储的数据量相对较少,并且其读取操作具有顺序性的特点。在 PageCache 机制的预读取作用下,操作系统会提前将相邻的数据块读取到 PageCache 中,这使得 ConsumeQueue 文件的读性能几乎接近读内存。即使在消息堆积的情况下,由于其顺序读取的特性,依然能够从 PageCache 中快速获取数据,不会对性能产生明显影响。
RocketMQ CommitLog 文件的读取性能挑战与优化
然而,对于 CommitLog 消息存储的日志数据文件来说,情况有所不同。在读取消息内容时,由于消息的存储和读取并非总是顺序的,会产生较多的随机访问读取操作。这些随机访问读取操作无法充分利用 PageCache 的预读取优势,需要频繁地从物理磁盘读取数据,严重影响了性能。为了改善这一情况,如果存储设备采用 SSD,并选择合适的系统 IO 调度算法,比如设置调度算法为 “Deadline”,随机读的性能会有所提升。“Deadline” 调度算法会为每个 I/O 请求设置一个截止时间,优先处理那些即将超过截止时间的请求,从而减少了 I/O 请求的响应时间,提高了随机读的性能,使 RocketMQ 在处理 CommitLog 文件的读取操作时更加高效。
broker 刷盘机制

同步刷盘
- 生产者发送消息到 Broker 后,消息首先被写入到内存的 PageCache 中。然后会将这些消息加入到一个待刷盘的组中,然后开始等待刷盘操作完成。只有当消息真正持久化到磁盘的物理文件后,Broker 才会向生产者返回消息发送成功的响应。
- 要等待落盘完成才响应给 producer,所有吞吐量也是最差的;
异步刷盘
- 将 PageCache 中的消息数据异步地持久化到磁盘。生产者发送消息后,Broker 把消息写入 PageCache 就立即向生产者返回发送成功的响应,后续由后台线程按照一定的时间间隔或者消息积累量来批量执行刷盘操作。
- 但是如果写到 page cache,宿主机崩溃了,这部分数据就丢失了。这时系统吞吐量虽然高了,但是有丢失数据的风险;
异步刷盘-内存级读写分离机制
- 消息首先会被写入到直接内存(Direct Memory),然后通过 commit 操作将堆外内存中的消息数据转移到操作系统的 PageCache 中,最后依赖操作系统的异步刷盘机制,将 PageCache 中的数据持久化到磁盘。
- 在写 page cache 和刷盘这两步都有丢失数据的风险;
| 刷盘机制 | 异步实时刷盘 | 异步刷盘(内存读写分离) | 同步刷盘 |
|---|---|---|---|
| 数据一致性 | 中 | 低 | 高 |
| 数据可靠性 | 低 | 低 | 高 |
| 数据可用性 | 中 | 低 | 高 |
| 系统吞吐量 | 高 | 高 | 低 |
broker过期文件删除机制
由于内存和磁盘都是有限的资源,Broker 不可能永久地保存所有数据,所以一些超过保存期限的数据会被定期删除。在 RocketMQ 中有 commitlog、ConsumeQueue、Index 这三种重要的文件。
RocketMQ 目前有两个情况会触发删除 commitLog 文件:
- 假如 commitLog 文件最后一次更新时间距离当前已经超过 72 小时了(默认值,可配置);
- 假如 commitLog 文件所在的磁盘空间超过 85% 时(默认值,可配置),也会触发删除操作。
因为 CommitLog 文件会过期,那么其对应的 ConsumeQueue 和 Index 文件就没有必要再保留了,也是会删除。
生产者端负载均衡
原理
生产者端的负载均衡是指在发送消息时,如何将消息均匀地分发到多个 Broker 的多个队列上。这样可以充分利用各个队列的资源,提高消息发送的吞吐量和性能。
实现方式
- 轮询策略:这是 RocketMQ 默认的消息队列选择策略。生产者会按照顺序依次选择 Topic 下的各个队列进行消息发送。例如,对于一个包含 4 个队列的 Topic,生产者会依次将消息发送到队列 0、队列 1、队列 2、队列 3,然后再回到队列 0 继续循环。
- 随机策略:生产者随机选择一个队列来发送消息。这种策略可以在一定程度上避免某些队列负载过高,但可能会导致消息分布不够均匀。
- 根据消息的 key 进行哈希选择:生产者可以根据消息的 key 计算哈希值,然后根据哈希值选择对应的队列。这样可以保证具有相同 key 的消息总是被发送到同一个队列中,适用于需要保证消息顺序性的场景。
消费者端负载均衡
消息队列的负载均衡和重分布是由 RebalanceService 处理的。这个后台线程服务会每隔 20 秒钟去处理消息队列的负载均衡和重分布。
RocketMQ 提供五种队列分配策略,这里分析两种常用的。
平均分配

假如某个 topic 有四个队列
- 假如消费者组中有 2 个消费者:每个消费者分两个队列;
- 假如消费者组中有 3 个消费者:第一个消费者分两个队列,剩下两个消费者分别消费一个队列;
- 假如消费者组中有 4 个消费者:每个消费者分一个队列;
- 假如消费者组中有 5 个消费者:因为只有四个队列,所以最后一个消费者无法消费;
消息队列分配原则为一个消费者可以分配多个消息队列,但同一个消息队列只会分配给一个消费者,如果消费者个数大于消息队列数量,则有些消费者无法消费消息。
平均环形分配

负载均衡的动态调整
- 消费者实例的动态增减:当消费者组中有新的消费者实例加入或有消费者实例退出时,RocketMQ 会自动触发负载均衡的重新分配。新加入的消费者实例会从其他消费者实例中获取部分队列进行消费,而退出的消费者实例的队列会被重新分配给其他消费者实例。
- 队列的动态增减:当 Topic 的队列数量发生变化时,RocketMQ 也会重新进行负载均衡,以确保队列能够被合理地分配给消费者实例。
影响负载均衡的因素
- 消费者实例的性能差异:如果消费者组中的各个消费者实例的性能差异较大,可能会导致部分消费者实例处理能力不足,而部分消费者实例处理能力过剩。可以通过监控消费者实例的性能指标,如消息处理延迟、吞吐量等,对消费者实例进行调整或优化。
- 网络延迟:网络延迟可能会影响消息的传输速度和消费者实例的响应时间,从而影响负载均衡的效果。可以通过优化网络环境、选择合适的部署位置等方式来减少网络延迟的影响。
说说 Topic 和 Queue 的区别与联系?
概念维度
- Topic:是逻辑概念,作为消息的一种分类方式。生产者依据消息的业务属性,把消息发送到对应的 Topic 里。
- Queue:属于物理概念,是真正存储消息的地方。它是消息存储和消费的基本单元,就像一个个小格子,把消息有序存放。
功能用途
- Topic:主要用于消息的组织与管理,生产者通过它对消息分类,消费者则依据自身业务需求订阅相应 Topic 来获取消息。
- Queue:一方面负责存储消息,另一方面实现负载均衡和并行消费。一个 Topic 下的多个 Queue 可分布在不同 Broker 上,消费者能并行从不同 Queue 消费消息,提升系统吞吐量。
包含关系
- 一个 Topic 由多个 Queue 组成,Queue 是 Topic 物理存储的具体实现。生产者发到 Topic 的消息会分散存于该 Topic 下的各个 Queue 中。
协作完成消息传递
- Queue 为 Topic 的消息传递提供基础支撑。消费者订阅 Topic 后,实际上是从该 Topic 下的 Queue 拉取消息进行消费。
负载均衡实现
- 多个 Queue 实现了 Topic 消息的负载均衡。生产者将消息均匀发往各个 Queue,消费者并行从不同 Queue 消费,提高系统性能。例如,一个有 4 个 Queue 的 Topic,多个消费者可分别从不同 Queue 拉取消息,实现并行处理。
简述 RocketMQ 消息的生产和消费流程?
消息生产流程
- 生产者启动:生产者进程启动后,会与 NameServer 建立长连接,从 NameServer 获取 Broker 的元数据信息,包括 Broker 的地址、Topic 对应的队列分布等。
- 选择队列:生产者在发送消息时,会根据配置的负载均衡策略(如轮询、随机、根据消息 key 哈希等)选择一个合适的队列。例如,若采用轮询策略,生产者会依次选择 Topic 下的各个队列。
- 发送消息:生产者将消息发送到选择的队列所在的 Broker 节点。消息会先被写入 Broker 的内存(PageCache),之后根据刷盘策略(同步刷盘或异步刷盘)将消息持久化到磁盘。
- 接收响应:Broker 处理完消息写入操作后,会向生产者返回发送结果。如果是同步刷盘,Broker 会在消息真正持久化到磁盘后才返回响应;如果是异步刷盘,Broker 会在消息写入 PageCache 后立即返回响应。
消息消费流程
- 消费者启动:消费者进程启动后,同样会与 NameServer 建立长连接,获取 Broker 的元数据信息。然后,消费者会向 Broker 发送订阅请求,告知 Broker 自己感兴趣的 Topic。
- 负载均衡:在集群消费模式下,消费者组中的各个消费者实例会根据负载均衡策略(如平均分配、一致性哈希等)对 Topic 下的队列进行分配,确保每个消费者实例负责消费一部分队列。在广播消费模式下,每个消费者实例会消费 Topic 下的所有队列。
- 拉取消息:消费者根据分配到的队列,向对应的 Broker 节点发送拉取消息的请求。Broker 接收到请求后,会从队列中读取消息并返回给消费者。
- 消费消息:消费者接收到消息后,会进行业务逻辑处理。处理完成后,消费者会向 Broker 提交消费进度,以便在下次启动时能够从正确的位置继续消费消息。
流程交互与协调
- NameServer 的作用:NameServer 作为 RocketMQ 的元数据管理中心,为生产者和消费者提供 Broker 的地址信息和 Topic 的路由信息,保证生产者和消费者能够找到正确的 Broker 节点进行消息的发送和接收。
- Broker 的角色:Broker 负责消息的存储和转发,接收生产者发送的消息并将其持久化到磁盘,同时根据消费者的请求提供消息服务。
- 消息的顺序性:在顺序消息的场景下,生产者会将具有相同顺序要求的消息发送到同一个队列中,消费者按顺序从队列中消费消息,从而保证消息的顺序性。
RocketMQ 如何实现高可用架构?
RocketMQ 通过多组件协同与多种机制保障,构建了高可用架构,主要体现在 NameServer、Broker 等组件的设计上,以下是详细介绍:
NameServer 多节点部署
NameServer 是 RocketMQ 的元数据管理中心,为实现高可用,采用多节点部署模式。
- 无状态设计:每个 NameServer 节点都是无状态的,它们之间相互独立,不进行数据交互与同步。这意味着单个 NameServer 节点的故障不会影响其他节点的正常运行。
- Broker 多注册:Broker 会向所有 NameServer 节点注册自身信息,包括 Topic、队列等元数据。生产者和消费者在启动时,会从多个 NameServer 节点获取 Broker 的元数据信息。这样,当某个 NameServer 节点出现故障时,生产者和消费者仍可从其他正常的 NameServer 节点获取所需信息,确保系统能够继续正常工作。
Broker 主从复制与故障转移
主从架构设计
Broker 采用主从架构,一个主 Broker(Master)可以对应多个从 Broker(Slave)。主 Broker 负责处理消息的读写操作,从 Broker 则从主 Broker 同步消息数据,实现数据的备份。
同步与异步复制
- 同步复制:主 Broker 在收到生产者发送的消息后,会等待从 Broker 同步该消息成功后,才向生产者返回消息发送成功的响应。这种方式能保证主从 Broker 的数据一致性,但会增加消息发送的延迟。
- 异步复制:主 Broker 收到消息后,立即向生产者返回响应,然后异步地将消息同步给从 Broker。这种方式可以提高消息发送的性能,但在主 Broker 故障时,可能会有少量消息未同步到从 Broker。
故障转移
当主 Broker 出现故障时,RocketMQ 可以通过手动或自动的方式进行故障转移。在自动故障转移场景下,通过监控系统检测到主 Broker 故障后,会将从 Broker 提升为主 Broker,继续提供服务,确保消息的正常读写。
消息重试与补偿机制
生产者消息重试
生产者在发送消息时,如果遇到网络异常、Broker 繁忙等问题导致消息发送失败,会进行重试。通过设置合理的重试次数和重试间隔,确保消息能够成功发送到 Broker。
消费者消息重试
消费者在消费消息时,如果处理失败,RocketMQ 会将消息重新放回队列,让消费者进行重试。可以根据业务需求设置重试次数和重试策略,避免因短暂的异常导致消息处理失败。
负载均衡
生产者负载均衡
生产者在发送消息时,会根据负载均衡策略(如轮询、随机、根据消息 key 哈希等)选择合适的队列进行消息发送。这样可以将消息均匀地分布到多个 Broker 节点上,避免单个 Broker 节点负载过高。
消费者负载均衡
在集群消费模式下,消费者组中的各个消费者实例会根据负载均衡策略(如平均分配、一致性哈希等)对 Topic 下的队列进行分配,确保每个消费者实例负责消费一部分队列,提高消费的效率和性能。
RocketMQ 出现消息堆积时,如何处理?
消费者处理能力不足
- 增加消费者实例:在集群消费模式下,可以通过增加消费者实例的数量来提高整体的消费能力。例如,原本有 2 个消费者实例,可以增加到 4 个,让更多的实例并行消费消息,从而加快消费速度。
- 优化消费者代码:检查消费者的业务逻辑代码,看是否存在耗时过长的操作,如数据库查询、复杂计算等。可以对这些操作进行优化,例如使用缓存、异步处理等方式,减少单条消息的处理时间。
- 调整消费线程数:适当增加消费者的消费线程数,提高单个消费者实例的并发消费能力。但要注意,线程数过多可能会导致系统资源竞争激烈,反而降低性能,需要根据实际情况进行调整。
注意:增加消费者实例不一定有效,还得看消息队列的个数够不够。
比如总共只有 4 个 queue,有 5 个消费者的话,其中有一个消费者是不消费消息的。
Broker 性能瓶颈
- 增加 Broker 节点:通过水平扩展 Broker 节点,将消息分散到多个 Broker 上,减轻单个 Broker 的负载压力。
- 优化 Broker 配置:调整 Broker 的配置参数,如刷盘策略、内存分配等,提高 Broker 的性能。例如,将刷盘策略从同步刷盘改为异步刷盘,可以提高消息的写入性能。
RocketMQ 和 Kafka 相比,有哪些优缺点?
RocketMQ 优点:
- 功能特性丰富:支持事务消息、顺序消息等高级特性。事务消息可保证分布式系统中数据的最终一致性,适用于金融等对数据一致性要求高的场景;顺序消息能确保消息按照发送顺序被消费,满足如订单状态流转等业务需求。
- 强大的运维管理能力:提供了直观的控制台和丰富的运维工具,方便进行集群管理、监控和故障排查。例如,可实时查看消息的生产和消费情况、Broker 的状态等。
- 良好的中文文档和社区支持:对于国内开发者来说,学习和使用成本较低,遇到问题能更容易找到相关的资料和解决方案。
Kafka 优点:
- 超高的吞吐量:采用了分区、批量读写和零拷贝等技术,在处理海量数据时表现出色,每秒可处理数百万条消息,适合大数据、日志收集等场景。
- 生态系统完善:与众多大数据生态系统如 Hadoop、Spark 等有良好的集成,方便进行数据的存储、分析和处理。
- 分布式扩展性强:能够轻松应对大规模的集群扩展,可通过增加 Broker 节点来提高系统的处理能力。
RocketMQ 缺点:
- 吞吐量相对较低:在处理超大规模数据时,其吞吐量不如 Kafka,可能无法满足一些对吞吐量要求极高的场景。
- 生态系统相对较窄:与大数据生态系统的集成不如 Kafka 广泛,在大数据处理方面的应用场景相对受限。
Kafka 缺点:
- 功能特性相对较少:对事务消息、顺序消息等高级特性的支持不如 RocketMQ 完善,在一些对消息处理逻辑要求复杂的场景下使用不便。
Kafka 适用场景
- 大数据领域:凭借其高吞吐量和完善的生态系统,适合用于日志收集、数据采集和实时数据分析等场景。
- 流式处理:与流式处理框架的良好集成,比如 flink,使其成为构建实时数据处理管道的理想选择。