MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 消息存储机制探秘

2023-11-036.9k 阅读

RocketMQ 消息存储基础结构

RocketMQ 的消息存储是其核心功能之一,它确保了消息的可靠持久化与高效读取。其存储结构主要由以下几个关键部分组成:

1. CommitLog

CommitLog 是 RocketMQ 存储消息的核心文件,所有主题的消息都顺序写入到这个文件中。这种设计避免了像传统数据库那样随机 I/O 的性能瓶颈,大大提高了写入效率。每个 CommitLog 文件默认大小为 1G,当一个文件写满后,会自动切换到下一个文件继续写入。

例如,假设当前 CommitLog 文件名为 commitlog-00000000000000000000,当它写满后,下一个文件将命名为 commitlog-00000000001073741824。这种命名方式通过文件偏移量来精确标识每个文件,方便后续的定位与管理。

2. ConsumeQueue

ConsumeQueue 是消息消费队列,它为每个主题和队列维护了一个索引结构,指向 CommitLog 中的具体消息位置。ConsumeQueue 存储了消息在 CommitLog 中的偏移量、消息长度和消息 Tag 的哈希值等信息。

以一个简单的主题 testTopic 为例,其对应的 ConsumeQueue 可能存储如下信息:

偏移量消息长度Tag 哈希值
1024512123456
1536256789012

通过这些信息,消费者可以快速定位到 CommitLog 中所需的消息,提高消息消费的效率。

3. IndexFile

IndexFile 用于支持消息的按 Key 查找。当生产者发送消息时,可以为消息设置 Key。IndexFile 会记录这些 Key 与消息在 CommitLog 中的偏移量之间的映射关系。

假设我们有一条消息,其 Key 为 order123,在 CommitLog 中的偏移量为 2048。IndexFile 会将这个映射关系记录下来,当需要根据 Key 查找消息时,就可以通过 IndexFile 快速定位到消息在 CommitLog 中的位置。

RocketMQ 消息写入流程

了解了基础结构后,我们深入探讨消息写入的具体流程。

1. 生产者发送消息

生产者将消息发送到 Broker,Broker 接收到消息后,会对消息进行一些预处理,比如检查消息的合法性、设置消息的一些属性等。

下面是一个简单的 Java 代码示例,展示如何使用 RocketMQ 生产者发送消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例
            Message message = new Message("testTopic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
            // 设置消息 Key
            message.setKeys("key" + i);
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

在这个示例中,我们创建了一个生产者实例,设置了 NameServer 地址并启动。然后循环发送 10 条消息,每条消息都设置了主题、标签和 Key。

2. 消息写入 CommitLog

Broker 预处理完消息后,会将消息顺序写入 CommitLog。由于是顺序写入,磁盘 I/O 的效率非常高。

在 RocketMQ 的源码中,CommitLog#putMessage 方法负责将消息写入 CommitLog。以下是简化后的关键代码片段:

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 计算写入位置
    long wroteOffset = this.wrotePosition.get();

    // 检查是否需要切换文件
    if (wroteOffset + msg.getBody().length > this.fileSize) {
        // 切换文件逻辑
    }

    // 写入消息头和消息体
    ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBody());
    this.mappedFile.appendMessage(byteBuffer);

    // 更新写入位置
    this.wrotePosition.addAndGet(msg.getBody().length);

    return new PutMessageResult(PutMessageStatus.PUT_OK, wroteOffset);
}

这段代码首先获取当前写入位置,检查是否需要切换文件。如果不需要,就将消息体写入当前的 CommitLog 文件,并更新写入位置。

3. 构建 ConsumeQueue 和 IndexFile

消息写入 CommitLog 成功后,Broker 会异步构建 ConsumeQueue 和 IndexFile。

对于 ConsumeQueue,ConsumeQueue#putMessagePositionInfoWrapper 方法负责将消息在 CommitLog 中的位置等信息写入 ConsumeQueue。代码如下:

public void putMessagePositionInfoWrapper(final MessageExtBrokerInner msg, final long offset, final int size) {
    // 计算 ConsumeQueue 写入位置
    long qqOffset = this.getWriteOffset();

    // 构建消息位置信息
    MessageQueueItem mqI = new MessageQueueItem(offset, size, msg.getTagsCode());
    ByteBuffer byteBuffer = ByteBuffer.wrap(mqI.encode());

    // 写入 ConsumeQueue
    this.mappedFile.appendMessage(byteBuffer);

    // 更新写入位置
    this.putMessagePositionInfo(qqOffset, mqI.getSize());
}

这段代码计算了 ConsumeQueue 的写入位置,构建了消息位置信息并写入 ConsumeQueue 文件,同时更新写入位置。

对于 IndexFile,IndexService#putMessage 方法负责将消息的 Key 与偏移量的映射关系写入 IndexFile。代码如下:

public void putMessage(final MessageExtBrokerInner msg) {
    // 获取消息 Key
    String keys = msg.getKeys();
    if (keys != null && keys.length() > 0) {
        String[] keysSet = keys.split(MessageConst.KEY_SEPARATOR);
        for (String key : keysSet) {
            // 计算索引位置
            long indexPos = this.getAndIncreaseIndexCount();
            IndexHdr indexHdr = new IndexHdr();
            indexHdr.setKeyHash(key.hashCode());
            indexHdr.setMsgIndex(msg.getCommitLogOffset());
            indexHdr.setNextIndex(this.indexHeader.getIndexCount() * IndexHdr.SIZE);

            // 写入 IndexFile
            ByteBuffer byteBuffer = ByteBuffer.wrap(indexHdr.encode());
            this.indexFile.appendMessage(byteBuffer);

            // 更新索引头信息
            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
        }
    }
}

这段代码首先获取消息的 Key,对每个 Key 计算索引位置,构建索引头信息并写入 IndexFile,同时更新索引头的相关计数。

RocketMQ 消息读取流程

消息的读取流程同样涉及到多个组件的协同工作。

1. 消费者拉取消息

消费者通过向 Broker 发送拉取请求来获取消息。消费者可以设置拉取的主题、队列、偏移量等参数。

以下是一个简单的 Java 代码示例,展示如何使用 RocketMQ 消费者拉取消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题
        consumer.subscribe("testTopic", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started");
    }
}

在这个示例中,我们创建了一个消费者实例,设置了 NameServer 地址并订阅了 testTopic。然后注册了一个消息监听器,当有消息到达时,会打印消息内容。

2. Broker 从 ConsumeQueue 获取消息位置

Broker 接收到消费者的拉取请求后,首先从 ConsumeQueue 中获取消息在 CommitLog 中的位置信息。

在 RocketMQ 源码中,ConsumeQueue#selectOneMessagePosition 方法用于从 ConsumeQueue 中获取指定偏移量的消息位置信息。代码如下:

public SelectMappedBufferResult selectOneMessagePosition(final long offset) {
    // 计算偏移量在文件中的位置
    int pos = (int) (offset * MessageQueueItem.SIZE);
    // 读取 ConsumeQueue 文件
    SelectMappedBufferResult result = this.mappedFile.selectMappedBuffer(pos, MessageQueueItem.SIZE);
    if (result != null) {
        ByteBuffer byteBuffer = result.getByteBuffer();
        // 解析消息位置信息
        long offsetPy = byteBuffer.getLong();
        int sizePy = byteBuffer.getInt();
        long tagsCode = byteBuffer.getLong();

        return new SelectMappedBufferResult(result.getByteBuffer(), offsetPy, sizePy, tagsCode);
    }
    return null;
}

这段代码计算了偏移量在 ConsumeQueue 文件中的位置,读取文件内容并解析出消息在 CommitLog 中的偏移量、消息长度等信息。

3. Broker 从 CommitLog 读取消息

获取到消息在 CommitLog 中的位置后,Broker 从 CommitLog 中读取具体的消息内容。

CommitLog#selectMappedBufferByOffset 方法用于从 CommitLog 中读取指定偏移量的消息。代码如下:

public SelectMappedBufferResult selectMappedBufferByOffset(final long offset) {
    // 计算偏移量在文件中的位置
    int pos = (int) (offset % this.fileSize);
    // 定位到具体的 CommitLog 文件
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
    if (mappedFile != null) {
        // 读取 CommitLog 文件
        SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
        return result;
    }
    return null;
}

这段代码计算了偏移量在文件中的位置,定位到对应的 CommitLog 文件并读取文件内容,返回包含消息的 SelectMappedBufferResult

RocketMQ 消息存储的优化策略

为了进一步提高消息存储的性能和可靠性,RocketMQ 采用了多种优化策略。

1. 刷盘策略

RocketMQ 支持两种刷盘策略:同步刷盘和异步刷盘。

同步刷盘确保消息在写入 CommitLog 后立即刷盘到磁盘,保证了数据的可靠性,但会降低写入性能。而异步刷盘则是将消息先写入内存,然后由后台线程异步刷盘,提高了写入性能,但在系统崩溃时可能会丢失少量未刷盘的消息。

可以通过修改 Broker 配置文件来设置刷盘策略,例如:

<broker>
    <flushDiskType>ASYNC_FLUSH</flushDiskType>
</broker>

上述配置表示采用异步刷盘策略。

2. 内存映射文件

RocketMQ 使用内存映射文件(MappedByteBuffer)来进行文件的读写操作。通过内存映射,操作系统会将文件内容映射到进程的虚拟地址空间,应用程序可以像访问内存一样直接访问文件,避免了传统 I/O 操作中的用户态与内核态切换,大大提高了读写效率。

在 RocketMQ 源码中,MappedFile 类封装了内存映射文件的操作。例如,MappedFile#appendMessage 方法通过 MappedByteBuffer 将消息写入文件:

public int appendMessage(final ByteBuffer byteBuffer) {
    // 获取当前写入位置
    int currentPos = this.wrotePosition.get();
    if (currentPos + byteBuffer.limit() <= this.fileSize) {
        try {
            // 写入消息
            this.mappedByteBuffer.put(byteBuffer);
        } catch (Exception e) {
            log.error("Error occurred when writing to mapped buffer.", e);
        }
        // 更新写入位置
        this.wrotePosition.addAndGet(byteBuffer.limit());
        return byteBuffer.limit();
    }
    return -1;
}

3. 多线程处理

RocketMQ 在消息写入和读取过程中采用多线程机制来提高并发性能。例如,在消息写入时,除了主线程将消息写入 CommitLog 外,还有专门的线程负责异步构建 ConsumeQueue 和 IndexFile,减少了主线程的负担,提高了整体的写入效率。

在消息读取时,Broker 也可以通过多线程并行处理多个消费者的拉取请求,提高系统的并发处理能力。

RocketMQ 消息存储的高可用性

RocketMQ 通过主从架构和数据复制机制来实现消息存储的高可用性。

1. 主从架构

RocketMQ 的 Broker 采用主从架构,一个 Master Broker 可以有多个 Slave Broker。Master Broker 负责处理消息的写入和读取请求,Slave Broker 则从 Master Broker 同步数据,作为备份。

在 Broker 配置文件中,可以配置 Master 和 Slave 的关系,例如:

<broker>
    <brokerId>0</brokerId>
    <brokerRole>ASYNC_MASTER</brokerRole>
    <masterAddr>192.168.1.100:10911</masterAddr>
</broker>

<broker>
    <brokerId>1</brokerId>
    <brokerRole>SLAVE</brokerRole>
    <masterAddr>192.168.1.100:10911</masterAddr>
</broker>

上述配置中,brokerId 为 0 的是 Master Broker,brokerId 为 1 的是 Slave Broker,Slave Broker 从 Master Broker 同步数据。

2. 数据复制

RocketMQ 采用同步复制和异步复制两种方式进行数据复制。

同步复制确保 Slave Broker 接收到消息并写入磁盘后,Master Broker 才返回写入成功的响应,保证了数据的强一致性,但会降低写入性能。而异步复制则是 Master Broker 先返回写入成功的响应,然后异步将消息复制到 Slave Broker,提高了写入性能,但在 Master Broker 故障时可能会丢失少量未复制的消息。

可以通过修改 Broker 配置文件来设置复制方式,例如:

<broker>
    <brokerRole>SYNC_MASTER</brokerRole>
</broker>

上述配置表示采用同步复制方式。

通过这些机制,RocketMQ 实现了消息存储的高可用性,确保在部分 Broker 出现故障时,消息仍然能够可靠地存储和读取。