RocketMQ 消息存储机制探秘
RocketMQ 消息存储基础结构
RocketMQ 的消息存储是其核心功能之一,它确保了消息的可靠持久化与高效读取。其存储结构主要由以下几个关键部分组成:
1. CommitLog
CommitLog 是 RocketMQ 存储消息的核心文件,所有主题的消息都顺序写入到这个文件中。这种设计避免了像传统数据库那样随机 I/O 的性能瓶颈,大大提高了写入效率。每个 CommitLog 文件默认大小为 1G,当一个文件写满后,会自动切换到下一个文件继续写入。
例如,假设当前 CommitLog 文件名为 commitlog-00000000000000000000
,当它写满后,下一个文件将命名为 commitlog-00000000001073741824
。这种命名方式通过文件偏移量来精确标识每个文件,方便后续的定位与管理。
2. ConsumeQueue
ConsumeQueue 是消息消费队列,它为每个主题和队列维护了一个索引结构,指向 CommitLog 中的具体消息位置。ConsumeQueue 存储了消息在 CommitLog 中的偏移量、消息长度和消息 Tag 的哈希值等信息。
以一个简单的主题 testTopic
为例,其对应的 ConsumeQueue 可能存储如下信息:
偏移量 | 消息长度 | Tag 哈希值 |
---|---|---|
1024 | 512 | 123456 |
1536 | 256 | 789012 |
通过这些信息,消费者可以快速定位到 CommitLog 中所需的消息,提高消息消费的效率。
3. IndexFile
IndexFile 用于支持消息的按 Key 查找。当生产者发送消息时,可以为消息设置 Key。IndexFile 会记录这些 Key 与消息在 CommitLog 中的偏移量之间的映射关系。
假设我们有一条消息,其 Key 为 order123
,在 CommitLog 中的偏移量为 2048。IndexFile 会将这个映射关系记录下来,当需要根据 Key 查找消息时,就可以通过 IndexFile 快速定位到消息在 CommitLog 中的位置。
RocketMQ 消息写入流程
了解了基础结构后,我们深入探讨消息写入的具体流程。
1. 生产者发送消息
生产者将消息发送到 Broker,Broker 接收到消息后,会对消息进行一些预处理,比如检查消息的合法性、设置消息的一些属性等。
下面是一个简单的 Java 代码示例,展示如何使用 RocketMQ 生产者发送消息:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息实例
Message message = new Message("testTopic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
// 设置消息 Key
message.setKeys("key" + i);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
在这个示例中,我们创建了一个生产者实例,设置了 NameServer 地址并启动。然后循环发送 10 条消息,每条消息都设置了主题、标签和 Key。
2. 消息写入 CommitLog
Broker 预处理完消息后,会将消息顺序写入 CommitLog。由于是顺序写入,磁盘 I/O 的效率非常高。
在 RocketMQ 的源码中,CommitLog#putMessage
方法负责将消息写入 CommitLog。以下是简化后的关键代码片段:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 计算写入位置
long wroteOffset = this.wrotePosition.get();
// 检查是否需要切换文件
if (wroteOffset + msg.getBody().length > this.fileSize) {
// 切换文件逻辑
}
// 写入消息头和消息体
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBody());
this.mappedFile.appendMessage(byteBuffer);
// 更新写入位置
this.wrotePosition.addAndGet(msg.getBody().length);
return new PutMessageResult(PutMessageStatus.PUT_OK, wroteOffset);
}
这段代码首先获取当前写入位置,检查是否需要切换文件。如果不需要,就将消息体写入当前的 CommitLog 文件,并更新写入位置。
3. 构建 ConsumeQueue 和 IndexFile
消息写入 CommitLog 成功后,Broker 会异步构建 ConsumeQueue 和 IndexFile。
对于 ConsumeQueue,ConsumeQueue#putMessagePositionInfoWrapper
方法负责将消息在 CommitLog 中的位置等信息写入 ConsumeQueue。代码如下:
public void putMessagePositionInfoWrapper(final MessageExtBrokerInner msg, final long offset, final int size) {
// 计算 ConsumeQueue 写入位置
long qqOffset = this.getWriteOffset();
// 构建消息位置信息
MessageQueueItem mqI = new MessageQueueItem(offset, size, msg.getTagsCode());
ByteBuffer byteBuffer = ByteBuffer.wrap(mqI.encode());
// 写入 ConsumeQueue
this.mappedFile.appendMessage(byteBuffer);
// 更新写入位置
this.putMessagePositionInfo(qqOffset, mqI.getSize());
}
这段代码计算了 ConsumeQueue 的写入位置,构建了消息位置信息并写入 ConsumeQueue 文件,同时更新写入位置。
对于 IndexFile,IndexService#putMessage
方法负责将消息的 Key 与偏移量的映射关系写入 IndexFile。代码如下:
public void putMessage(final MessageExtBrokerInner msg) {
// 获取消息 Key
String keys = msg.getKeys();
if (keys != null && keys.length() > 0) {
String[] keysSet = keys.split(MessageConst.KEY_SEPARATOR);
for (String key : keysSet) {
// 计算索引位置
long indexPos = this.getAndIncreaseIndexCount();
IndexHdr indexHdr = new IndexHdr();
indexHdr.setKeyHash(key.hashCode());
indexHdr.setMsgIndex(msg.getCommitLogOffset());
indexHdr.setNextIndex(this.indexHeader.getIndexCount() * IndexHdr.SIZE);
// 写入 IndexFile
ByteBuffer byteBuffer = ByteBuffer.wrap(indexHdr.encode());
this.indexFile.appendMessage(byteBuffer);
// 更新索引头信息
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
}
}
}
这段代码首先获取消息的 Key,对每个 Key 计算索引位置,构建索引头信息并写入 IndexFile,同时更新索引头的相关计数。
RocketMQ 消息读取流程
消息的读取流程同样涉及到多个组件的协同工作。
1. 消费者拉取消息
消费者通过向 Broker 发送拉取请求来获取消息。消费者可以设置拉取的主题、队列、偏移量等参数。
以下是一个简单的 Java 代码示例,展示如何使用 RocketMQ 消费者拉取消息:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题
consumer.subscribe("testTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started");
}
}
在这个示例中,我们创建了一个消费者实例,设置了 NameServer 地址并订阅了 testTopic
。然后注册了一个消息监听器,当有消息到达时,会打印消息内容。
2. Broker 从 ConsumeQueue 获取消息位置
Broker 接收到消费者的拉取请求后,首先从 ConsumeQueue 中获取消息在 CommitLog 中的位置信息。
在 RocketMQ 源码中,ConsumeQueue#selectOneMessagePosition
方法用于从 ConsumeQueue 中获取指定偏移量的消息位置信息。代码如下:
public SelectMappedBufferResult selectOneMessagePosition(final long offset) {
// 计算偏移量在文件中的位置
int pos = (int) (offset * MessageQueueItem.SIZE);
// 读取 ConsumeQueue 文件
SelectMappedBufferResult result = this.mappedFile.selectMappedBuffer(pos, MessageQueueItem.SIZE);
if (result != null) {
ByteBuffer byteBuffer = result.getByteBuffer();
// 解析消息位置信息
long offsetPy = byteBuffer.getLong();
int sizePy = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
return new SelectMappedBufferResult(result.getByteBuffer(), offsetPy, sizePy, tagsCode);
}
return null;
}
这段代码计算了偏移量在 ConsumeQueue 文件中的位置,读取文件内容并解析出消息在 CommitLog 中的偏移量、消息长度等信息。
3. Broker 从 CommitLog 读取消息
获取到消息在 CommitLog 中的位置后,Broker 从 CommitLog 中读取具体的消息内容。
CommitLog#selectMappedBufferByOffset
方法用于从 CommitLog 中读取指定偏移量的消息。代码如下:
public SelectMappedBufferResult selectMappedBufferByOffset(final long offset) {
// 计算偏移量在文件中的位置
int pos = (int) (offset % this.fileSize);
// 定位到具体的 CommitLog 文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
// 读取 CommitLog 文件
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
这段代码计算了偏移量在文件中的位置,定位到对应的 CommitLog 文件并读取文件内容,返回包含消息的 SelectMappedBufferResult
。
RocketMQ 消息存储的优化策略
为了进一步提高消息存储的性能和可靠性,RocketMQ 采用了多种优化策略。
1. 刷盘策略
RocketMQ 支持两种刷盘策略:同步刷盘和异步刷盘。
同步刷盘确保消息在写入 CommitLog 后立即刷盘到磁盘,保证了数据的可靠性,但会降低写入性能。而异步刷盘则是将消息先写入内存,然后由后台线程异步刷盘,提高了写入性能,但在系统崩溃时可能会丢失少量未刷盘的消息。
可以通过修改 Broker 配置文件来设置刷盘策略,例如:
<broker>
<flushDiskType>ASYNC_FLUSH</flushDiskType>
</broker>
上述配置表示采用异步刷盘策略。
2. 内存映射文件
RocketMQ 使用内存映射文件(MappedByteBuffer)来进行文件的读写操作。通过内存映射,操作系统会将文件内容映射到进程的虚拟地址空间,应用程序可以像访问内存一样直接访问文件,避免了传统 I/O 操作中的用户态与内核态切换,大大提高了读写效率。
在 RocketMQ 源码中,MappedFile
类封装了内存映射文件的操作。例如,MappedFile#appendMessage
方法通过 MappedByteBuffer
将消息写入文件:
public int appendMessage(final ByteBuffer byteBuffer) {
// 获取当前写入位置
int currentPos = this.wrotePosition.get();
if (currentPos + byteBuffer.limit() <= this.fileSize) {
try {
// 写入消息
this.mappedByteBuffer.put(byteBuffer);
} catch (Exception e) {
log.error("Error occurred when writing to mapped buffer.", e);
}
// 更新写入位置
this.wrotePosition.addAndGet(byteBuffer.limit());
return byteBuffer.limit();
}
return -1;
}
3. 多线程处理
RocketMQ 在消息写入和读取过程中采用多线程机制来提高并发性能。例如,在消息写入时,除了主线程将消息写入 CommitLog 外,还有专门的线程负责异步构建 ConsumeQueue 和 IndexFile,减少了主线程的负担,提高了整体的写入效率。
在消息读取时,Broker 也可以通过多线程并行处理多个消费者的拉取请求,提高系统的并发处理能力。
RocketMQ 消息存储的高可用性
RocketMQ 通过主从架构和数据复制机制来实现消息存储的高可用性。
1. 主从架构
RocketMQ 的 Broker 采用主从架构,一个 Master Broker 可以有多个 Slave Broker。Master Broker 负责处理消息的写入和读取请求,Slave Broker 则从 Master Broker 同步数据,作为备份。
在 Broker 配置文件中,可以配置 Master 和 Slave 的关系,例如:
<broker>
<brokerId>0</brokerId>
<brokerRole>ASYNC_MASTER</brokerRole>
<masterAddr>192.168.1.100:10911</masterAddr>
</broker>
<broker>
<brokerId>1</brokerId>
<brokerRole>SLAVE</brokerRole>
<masterAddr>192.168.1.100:10911</masterAddr>
</broker>
上述配置中,brokerId
为 0 的是 Master Broker,brokerId
为 1 的是 Slave Broker,Slave Broker 从 Master Broker 同步数据。
2. 数据复制
RocketMQ 采用同步复制和异步复制两种方式进行数据复制。
同步复制确保 Slave Broker 接收到消息并写入磁盘后,Master Broker 才返回写入成功的响应,保证了数据的强一致性,但会降低写入性能。而异步复制则是 Master Broker 先返回写入成功的响应,然后异步将消息复制到 Slave Broker,提高了写入性能,但在 Master Broker 故障时可能会丢失少量未复制的消息。
可以通过修改 Broker 配置文件来设置复制方式,例如:
<broker>
<brokerRole>SYNC_MASTER</brokerRole>
</broker>
上述配置表示采用同步复制方式。
通过这些机制,RocketMQ 实现了消息存储的高可用性,确保在部分 Broker 出现故障时,消息仍然能够可靠地存储和读取。