MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 架构深度剖析

2024-03-224.2k 阅读

RocketMQ 基础概念

在深入剖析 RocketMQ 架构之前,我们先来了解一些基础概念。

1. 消息(Message)

消息是 RocketMQ 传递的基本单元,它可以是任何类型的数据,比如订单信息、用户操作日志等。消息包含了消息体(Body)、消息头(Header)等部分。消息体承载了实际业务数据,而消息头则包含了一些元数据信息,例如消息的主题(Topic)、标签(Tag)、唯一标识(MessageId)等。

2. 主题(Topic)

主题是对消息进行分类的逻辑概念,它类似于数据库中的表,用于区分不同类型的消息。例如,在电商系统中,可以有 “order_topic” 用于处理订单相关消息,“payment_topic” 用于处理支付相关消息。不同的生产者可以向同一个主题发送消息,而消费者则可以订阅感兴趣的主题来接收消息。

3. 标签(Tag)

标签是在主题基础上对消息进行更细粒度的划分。一个主题可以包含多个标签,通过标签可以让消费者更精准地过滤接收消息。比如在 “order_topic” 主题下,可以有 “create_order_tag”、“cancel_order_tag” 等标签,这样消费者就可以只关注创建订单或取消订单的消息。

4. 生产者(Producer)

生产者负责创建并向 RocketMQ 发送消息。它可以是应用程序中的一个模块,根据业务逻辑生成消息,并选择合适的主题和标签将消息发送到 RocketMQ 集群。生产者有多种发送方式,包括同步发送、异步发送和单向发送,以满足不同的业务需求。

5. 消费者(Consumer)

消费者从 RocketMQ 中接收并处理消息。它通过订阅主题和标签来指定自己感兴趣的消息类型。消费者可以分为推模式(Push Consumer)和拉模式(Pull Consumer)。推模式下,RocketMQ 主动将消息推送给消费者;拉模式下,消费者主动从 RocketMQ 拉取消息。

6. 队列(Queue)

在 RocketMQ 中,队列是消息存储和分发的物理单位。一个主题可以包含多个队列,通过增加队列数量可以提高消息的并行处理能力。生产者发送消息时,会根据一定的策略将消息发送到不同的队列中。消费者则通过负载均衡机制从各个队列中获取消息进行处理。

RocketMQ 架构组件

1. NameServer

NameServer 是 RocketMQ 的轻量级元数据管理中心,它主要负责存储和管理 Topic、Broker 等元数据信息。每个 NameServer 节点之间相互独立,不进行数据同步。

NameServer 的主要功能包括:

  • Broker 管理:Broker 在启动时会向所有存活的 NameServer 注册自己的信息,包括 Broker 名称、IP 地址、端口号、所负责的 Topic 等。NameServer 会维护这些信息,并提供给生产者和消费者查询。
  • Topic 路由信息管理:NameServer 保存了 Topic 与 Broker 之间的映射关系,即每个 Topic 分布在哪些 Broker 上,以及每个 Broker 上该 Topic 包含哪些队列。生产者和消费者通过查询 NameServer 获取 Topic 的路由信息,从而知道如何与 Broker 进行交互。

NameServer 的架构设计使得它具有高可用性和易扩展性。由于各个 NameServer 节点相互独立,当某个节点出现故障时,不会影响其他节点的正常运行,并且可以方便地添加新的 NameServer 节点来提高系统的整体性能。

2. Broker

Broker 是 RocketMQ 的核心组件,它负责存储消息、提供消息的读写服务,并与生产者、消费者进行交互。一个 RocketMQ 集群可以包含多个 Broker,每个 Broker 可以负责多个 Topic 的消息存储和处理。

Broker 的主要功能包括:

  • 消息存储:Broker 将接收到的消息持久化到本地磁盘,采用了基于文件系统的存储方式,通过 CommitLog 和 ConsumeQueue 等数据结构来实现高效的消息存储和检索。
  • 消息发送处理:Broker 接收生产者发送的消息,对消息进行合法性校验、存储等操作,并返回发送结果给生产者。在处理消息发送时,Broker 会根据负载均衡策略将消息分配到不同的队列中。
  • 消息拉取处理:Broker 处理消费者的消息拉取请求,根据消费者的订阅信息和偏移量(Offset),从存储中读取相应的消息并返回给消费者。
  • 高可用性保障:Broker 通过主从架构(Master - Slave)来实现高可用性。Master 负责处理读写请求,Slave 则定期从 Master 同步数据,当 Master 出现故障时,Slave 可以切换为 Master 继续提供服务。

3. Producer

生产者是消息的发送端,在 RocketMQ 中,生产者具有以下特点和功能:

  • 多种发送方式
  • 同步发送:生产者发送消息后,会阻塞等待 Broker 的响应,直到收到发送成功或失败的结果。这种方式适用于对消息发送可靠性要求较高的场景,比如订单创建消息,确保消息一定被成功发送到 Broker。示例代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
producer.shutdown();
  • 异步发送:生产者发送消息后,不会阻塞等待响应,而是通过回调函数来处理发送结果。这种方式适用于对响应时间敏感,但对消息发送可靠性有一定要求的场景,比如日志记录消息。示例代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("Send message success: %s%n", sendResult);
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("Send message failed: %s%n", e);
    }
});
Thread.sleep(1000);
producer.shutdown();
  • 单向发送:生产者发送消息后,不等待 Broker 的响应,直接返回。这种方式适用于对消息发送可靠性要求较低,且对性能要求较高的场景,比如心跳消息。示例代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
producer.shutdown();
  • 负载均衡:生产者在发送消息时,会根据 Topic 的路由信息和负载均衡策略,将消息均匀地发送到不同的 Broker 队列中,以提高系统的整体性能和消息处理能力。常见的负载均衡策略有轮询、随机等。

4. Consumer

消费者是消息的接收端,在 RocketMQ 中有以下重要特性和功能:

  • 推模式与拉模式
  • 推模式(Push Consumer):RocketMQ 主动将消息推送给消费者。消费者通过注册监听器(MessageListener)来处理接收到的消息。这种模式实时性较高,适用于对消息处理及时性要求较高的场景。示例代码如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic", "tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("Receive message: %s%n", new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  • 拉模式(Pull Consumer):消费者主动从 Broker 拉取消息。消费者需要自行控制拉取的频率和数量。这种模式灵活性较高,适用于对消息处理有特殊需求的场景,比如批量处理消息。示例代码如下:
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
    PullResult pullResult = consumer.pullBlockIfNotFound("topic", "tag", getMessageQueue(), getOffset(), 32);
    System.out.printf("Pull message result: %s%n", pullResult);
} catch (Exception e) {
    e.printStackTrace();
}
consumer.shutdown();
  • 负载均衡:多个消费者实例在订阅同一个 Topic 时,会通过负载均衡机制来分配队列的消费权,确保每个消费者实例只处理部分队列的消息,从而实现并行消费,提高消费效率。常见的负载均衡算法有平均分配、一致性哈希等。
  • 消费重试:当消费者处理消息失败时,RocketMQ 支持自动重试机制。对于非幂等性的消息处理逻辑,需要注意重试可能带来的重复消费问题。消费者可以通过设置重试次数、重试间隔等参数来控制重试策略。

RocketMQ 消息存储机制

1. CommitLog

CommitLog 是 RocketMQ 存储消息的核心文件,它采用了顺序写的方式,将所有 Topic 的消息都顺序追加到一个日志文件中。这种设计方式可以充分利用磁盘的顺序写性能优势,提高消息写入的效率。

CommitLog 文件的结构如下:

  • 消息物理结构:每个消息在 CommitLog 中都有固定的格式,包括消息长度、消息体长度、Topic 长度、消息 Tag 哈希值、存储时间、存储偏移量等元数据,以及消息体内容。这种结构设计使得 RocketMQ 可以快速定位和读取消息。
  • 文件管理:CommitLog 文件默认大小为 1G,当一个文件写满后,会自动创建新的文件继续写入。通过这种方式,CommitLog 可以无限制地存储消息,并且方便进行文件的管理和维护。

2. ConsumeQueue

ConsumeQueue 是消息消费队列,它是 CommitLog 的索引文件。每个 Topic 的每个队列都对应一个 ConsumeQueue 文件,用于记录该队列中消息在 CommitLog 中的位置信息,以便消费者快速定位和拉取消息。

ConsumeQueue 文件的结构如下:

  • 索引项结构:每个索引项包含了消息在 CommitLog 中的偏移量、消息长度、消息 Tag 哈希值等信息。消费者通过读取 ConsumeQueue 文件,可以快速找到自己需要消费的消息在 CommitLog 中的位置,然后从 CommitLog 中读取消息内容。
  • 文件管理:ConsumeQueue 文件也采用了分段存储的方式,每个文件默认大小为 30W 条索引项。当一个文件写满后,会创建新的文件继续记录。这种设计方式使得 ConsumeQueue 可以高效地管理大量的消息索引,并且在消费者进行消息拉取时,可以快速定位到相应的消息位置。

3. IndexFile

IndexFile 是 RocketMQ 为了支持按照消息 Key 进行消息查询而设计的索引文件。它通过哈希表的方式,将消息 Key 与消息在 CommitLog 中的位置进行映射,从而实现快速的消息查询。

IndexFile 文件的结构如下:

  • 哈希表结构:IndexFile 内部维护了一个哈希表,每个哈希表项包含了消息 Key 的哈希值、消息在 CommitLog 中的偏移量、时间戳等信息。当生产者发送消息时,如果设置了消息 Key,RocketMQ 会将该消息的相关信息写入 IndexFile 中。
  • 文件管理:IndexFile 文件也有固定的大小,当文件写满后,会创建新的文件。通过 IndexFile,消费者可以根据消息 Key 快速查询到消息在 CommitLog 中的位置,进而读取消息内容。这种机制在一些需要根据特定 Key 进行消息检索的场景中非常有用,比如根据订单号查询订单相关的消息。

RocketMQ 高可用性架构

1. Broker 主从架构

RocketMQ 通过 Broker 的主从架构来实现高可用性。在一个 Broker 集群中,每个 Broker 都有一个 Master 节点和若干个 Slave 节点。

Master 节点负责处理所有的读写请求,包括生产者发送消息和消费者拉取消息。Slave 节点则定期从 Master 节点同步数据,保持与 Master 节点的数据一致性。

数据同步方式主要有两种:

  • 同步复制:Master 节点在接收到生产者发送的消息后,会等待所有 Slave 节点都成功复制该消息后,才向生产者返回成功响应。这种方式可以确保数据的强一致性,但会降低消息发送的性能,因为需要等待所有 Slave 节点的确认。
  • 异步复制:Master 节点在接收到生产者发送的消息后,直接向生产者返回成功响应,然后异步将消息复制给 Slave 节点。这种方式可以提高消息发送的性能,但在 Master 节点出现故障时,可能会丢失部分尚未复制到 Slave 节点的消息。

2. 故障转移

当 Master 节点出现故障时,RocketMQ 会自动进行故障转移,将 Slave 节点提升为 Master 节点继续提供服务。故障转移的过程如下:

  • 检测故障:NameServer 会定期检测 Broker 节点的存活状态,当发现 Master 节点不可用时,会标记该节点为故障状态。
  • 选择新 Master:RocketMQ 会从该 Broker 的 Slave 节点中选择一个合适的节点提升为 Master 节点。选择的策略通常是根据 Slave 节点的数据同步情况、性能等因素来确定。
  • 通知生产者和消费者:NameServer 在完成故障转移后,会将新的 Broker 拓扑结构信息通知给生产者和消费者。生产者和消费者在下次请求时,会根据新的拓扑结构信息与新的 Master 节点进行交互。

通过这种主从架构和故障转移机制,RocketMQ 可以在保证数据可靠性的同时,实现高可用性,确保系统在面对节点故障时仍然能够正常运行。

RocketMQ 负载均衡机制

1. 生产者负载均衡

生产者在发送消息时,需要将消息均匀地发送到不同的 Broker 队列中,以实现负载均衡。RocketMQ 生产者采用了多种负载均衡策略:

  • 轮询策略:默认情况下,生产者会采用轮询的方式,依次将消息发送到 Topic 对应的各个队列中。这种策略简单且公平,能够保证消息在各个队列中的分布相对均匀。示例代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendLatencyFaultEnable(true);
producer.start();
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
for (int i = 0; i < 10; i++) {
    try {
        SendResult sendResult = producer.send(message);
        System.out.printf("Send message result: %s%n", sendResult);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
producer.shutdown();
  • 随机策略:生产者也可以采用随机的方式选择队列发送消息。这种策略在一定程度上也能实现负载均衡,但可能会导致某些队列消息分布不均匀的情况。
  • 根据 Broker 负载均衡:生产者可以根据 Broker 的负载情况来选择队列发送消息。RocketMQ 会实时监控 Broker 的负载信息,生产者在发送消息时,会优先选择负载较轻的 Broker 队列进行发送,以提高系统的整体性能。

2. 消费者负载均衡

多个消费者实例在订阅同一个 Topic 时,需要通过负载均衡机制来分配队列的消费权,以实现并行消费。RocketMQ 消费者采用了以下负载均衡策略:

  • 平均分配策略:将 Topic 的所有队列平均分配给各个消费者实例。例如,如果有 3 个消费者实例订阅了一个包含 6 个队列的 Topic,那么每个消费者实例将负责消费 2 个队列的消息。这种策略简单直观,能够充分利用消费者的并行处理能力。示例代码如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic", "tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("Consumer %s receive messages: %s%n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  • 一致性哈希策略:通过一致性哈希算法将队列分配给消费者实例。这种策略在消费者实例数量发生变化时,能够尽量减少队列分配的变动,从而提高系统的稳定性。在一致性哈希策略下,当新增或减少消费者实例时,只有部分队列的消费权会发生转移,而不是像平均分配策略那样,所有队列都需要重新分配。

通过这些负载均衡机制,RocketMQ 能够有效地提高系统的整体性能和资源利用率,确保在高并发场景下,生产者和消费者都能高效地进行消息的发送和消费。

RocketMQ 事务消息机制

1. 事务消息概念

在一些业务场景中,需要保证消息的发送与本地业务操作的原子性,即要么本地业务操作和消息发送都成功,要么都失败。RocketMQ 的事务消息机制就是为了解决这类问题而设计的。

事务消息的处理流程分为三个阶段:

  • 发送 Half 消息:生产者首先向 Broker 发送 Half 消息,Half 消息是一种特殊的消息,它对消费者不可见。Broker 在接收到 Half 消息后,会返回一个成功响应给生产者。
  • 执行本地事务:生产者在接收到 Half 消息发送成功的响应后,开始执行本地业务操作。根据本地业务操作的执行结果,生产者向 Broker 发送 Commit 或 Rollback 消息。
  • 处理事务状态:Broker 在接收到生产者发送的 Commit 或 Rollback 消息后,会相应地将 Half 消息标记为可消费或删除。如果 Broker 长时间没有收到生产者发送的 Commit 或 Rollback 消息,会主动向生产者发送事务状态回查请求,生产者需要根据本地业务操作的实际状态进行响应。

2. 代码示例

下面是一个简单的 RocketMQ 事务消息示例代码:

TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地业务操作
        try {
            // 模拟本地业务操作成功
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 模拟本地业务操作失败
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 处理事务状态回查
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
producer.start();
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", sendResult);
producer.shutdown();

通过事务消息机制,RocketMQ 能够满足一些对数据一致性要求较高的业务场景,确保消息的可靠发送与本地业务操作的原子性。

RocketMQ 与其他消息队列的比较

1. 与 Kafka 的比较

  • 性能:Kafka 以高吞吐量著称,尤其在处理海量日志数据等场景下表现出色。RocketMQ 在性能方面也不逊色,特别是在低延迟和消息可靠性方面有较好的平衡。在一些对消息处理及时性要求较高的场景中,RocketMQ 的性能优势更为明显。
  • 消息可靠性:Kafka 默认采用异步复制方式,在某些情况下可能会丢失少量消息。而 RocketMQ 支持同步复制和异步复制两种方式,并且在故障转移方面有较为完善的机制,能够更好地保证消息的可靠性。
  • 功能特性:RocketMQ 提供了丰富的功能,如事务消息、消息顺序消费、消息重试等,这些功能在企业级应用中非常实用。Kafka 虽然也在不断完善这些功能,但相对来说 RocketMQ 的功能更加成熟和丰富。

2. 与 RabbitMQ 的比较

  • 性能:RabbitMQ 基于 Erlang 语言开发,在高并发场景下性能表现良好,但在处理大规模数据时,吞吐量相对 RocketMQ 和 Kafka 会低一些。
  • 消息模型:RabbitMQ 采用了 AMQP 协议,具有丰富的消息模型,如 direct、topic、fanout 等。RocketMQ 的消息模型相对简单,主要通过 Topic 和 Tag 进行消息分类和过滤,但这也使得 RocketMQ 的使用更加简洁明了。
  • 应用场景:RabbitMQ 适用于对消息可靠性要求极高,且对消息模型有复杂需求的场景,如金融行业的一些业务。RocketMQ 则更适合在互联网应用中,尤其是对性能、功能特性和可靠性都有一定要求的场景。

通过与其他常见消息队列的比较,可以看出 RocketMQ 在性能、可靠性和功能特性等方面都有自己的优势,能够满足不同业务场景的需求。

RocketMQ 应用场景

1. 异步处理

在许多应用中,存在一些耗时较长的操作,如订单处理后的库存更新、发送邮件通知等。通过将这些操作封装成消息发送到 RocketMQ 中,由消费者异步处理,可以显著提高系统的响应速度。例如,在电商系统中,用户下单后,订单创建的消息可以立即返回给用户,而库存更新和邮件通知等操作则由 RocketMQ 的消费者在后台异步执行。

2. 流量削峰

在一些高并发的场景下,如电商的促销活动、抢购等,瞬间的流量可能会对系统造成巨大压力。RocketMQ 可以作为流量缓冲,生产者将大量的请求消息发送到 RocketMQ 中,消费者则按照系统能够承受的速度从 RocketMQ 中拉取消息进行处理,从而有效地削峰填谷,保护后端系统不被高流量冲垮。

3. 系统解耦

在大型分布式系统中,各个模块之间往往存在复杂的依赖关系。通过使用 RocketMQ,模块之间可以通过消息进行通信,而不需要直接调用对方的接口,从而实现系统的解耦。例如,用户注册模块在用户注册成功后,可以发送一条消息到 RocketMQ,通知其他模块如积分系统、推荐系统等进行相应的处理,各个模块之间的耦合度大大降低。

4. 数据分发

RocketMQ 可以用于将数据分发给多个不同的系统或模块。例如,在一个大数据平台中,业务系统产生的日志数据可以通过 RocketMQ 分发给日志分析系统、数据仓库等进行不同的处理,实现数据的高效分发和共享。

通过这些应用场景可以看出,RocketMQ 在提升系统性能、增强系统稳定性和实现系统解耦等方面都发挥着重要作用,是一款非常强大且实用的消息队列中间件。