MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ消息消费模型与负载均衡策略

2024-05-295.6k 阅读

RocketMQ 消息消费模型

在 RocketMQ 中,消息消费模型主要分为两种:集群消费(Clustering)广播消费(Broadcasting)

集群消费

集群消费模式下,一个消费组内的多个消费者实例共同消费主题中的消息。RocketMQ 会将主题中的队列平均分配给消费组内的各个消费者实例,以达到负载均衡的目的。

例如,假设有一个主题 TopicA 包含 4 个队列(Queue0Queue1Queue2Queue3),消费组 ConsumerGroup1 中有 2 个消费者实例 Consumer1Consumer2。RocketMQ 会将 Queue0Queue1 分配给 Consumer1,将 Queue2Queue3 分配给 Consumer2。这样,两个消费者实例就可以并行消费消息,提高消费效率。

特点

  1. 负载均衡:消息队列会在消费组内的消费者实例间进行分配,每个实例只消费自己所负责队列中的消息。
  2. 消息不重复:在正常情况下,消费组内不会出现重复消费同一条消息的情况。
  3. 高可用性:消费组内某个消费者实例故障时,其负责的队列会被重新分配给其他实例,保证消息的正常消费。

广播消费

广播消费模式下,主题中的每条消息会被消费组内的所有消费者实例都消费一遍。

例如,同样是主题 TopicA 和消费组 ConsumerGroup1,当有新消息到达时,Consumer1Consumer2 都会收到该消息并进行消费。

特点

  1. 消息广播:消息会发送给消费组内的每一个消费者实例。
  2. 可能重复消费:由于每个实例都消费消息,可能会出现重复消费的情况,需要应用层自行处理去重逻辑。
  3. 适用于特定场景:常用于一些需要所有消费者都知晓的消息场景,如配置更新等。

RocketMQ 负载均衡策略

RocketMQ 的负载均衡策略主要体现在队列分配给消费者实例的过程中。

基于队列的负载均衡

  1. 平均分配策略:这是 RocketMQ 默认的负载均衡策略。它会将主题的队列按照消费者实例的数量进行平均分配。例如,有 10 个队列和 3 个消费者实例,平均分配的结果可能是:实例 1 分配到 4 个队列,实例 2 分配到 3 个队列,实例 3 分配到 3 个队列。计算公式大致为:每个实例分配的队列数 = 队列总数 / 消费者实例数(向下取整),剩余队列再依次分配给各个实例。
  2. 环形分配策略:这种策略将消费者实例和队列看作一个环形结构。从某个起始点开始,依次将队列分配给消费者实例。例如,有 4 个队列(Q0 - Q3)和 3 个消费者实例(C1 - C3),环形分配可能是:C1 分配 Q0,C2 分配 Q1,C3 分配 Q2,C1 再分配 Q3。

动态负载均衡

RocketMQ 支持动态负载均衡,当消费组内有新的消费者实例加入或者现有实例退出时,RocketMQ 会自动重新分配队列。

  1. 实例加入:当新的消费者实例加入消费组时,RocketMQ 会暂停所有消费者实例的消费,重新计算队列分配方案,然后将新的队列分配情况通知给各个实例,最后恢复消费。
  2. 实例退出:如果某个消费者实例因为故障或者主动退出,RocketMQ 同样会暂停消费,重新分配故障实例所负责的队列给其他实例,再恢复消费。

代码示例

以下是使用 Java 语言结合 RocketMQ 的客户端 API 来展示集群消费和广播消费的代码示例。

集群消费代码示例

  1. 引入依赖: 在 pom.xml 文件中添加 RocketMQ 客户端依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>
  1. 编写消费者代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ClusterConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup1");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("TopicA", "TagA");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Consume from queue: " + msg.getQueueId() + ", message: " + new String(msg.getBody()));
                }
                // 消费成功返回 CONSUME_SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Cluster Consumer Started.");
    }
}

在上述代码中,我们创建了一个集群消费模式的消费者。通过 DefaultMQPushConsumer 创建消费者实例,并指定消费组 ConsumerGroup1。设置 NameServer 地址后,订阅了主题 TopicA 及其标签 TagA。在消息监听器中,我们简单地打印出消息来自哪个队列以及消息内容。

广播消费代码示例

  1. 广播消费的依赖与集群消费相同:同样在 pom.xml 文件中添加 RocketMQ 客户端依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>
  1. 编写广播消费的消费者代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup1");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("TopicA", "TagA");
        // 设置为广播消费模式
        consumer.setMessageModel(org.apache.rocketmq.client.producer.MessageModel.BROADCASTING);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Broadcast Consume, message: " + new String(msg.getBody()));
                }
                // 消费成功返回 CONSUME_SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Broadcast Consumer Started.");
    }
}

此代码与集群消费代码类似,但通过 consumer.setMessageModel(org.apache.rocketmq.client.producer.MessageModel.BROADCASTING) 将消费模式设置为广播消费。这样,消费组内的每个消费者实例都会收到主题 TopicA 中带有 TagA 标签的所有消息。

负载均衡策略调整代码示例

  1. 自定义负载均衡策略: 要自定义负载均衡策略,需要实现 AllocateMessageQueueStrategy 接口。以下是一个简单的自定义平均分配策略示例:
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomAverageAllocateMessageQueueStrategy implements AllocateMessageQueueStrategy {
    @Override
    public String getName() {
        return "CustomAverageAllocate";
    }

    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, Map<String, List<String>> cidAll) {
        List<MessageQueue> result = new ArrayList<>();
        int consumerCount = cidAll.get(consumerGroup).size();
        int queueCount = mqAll.size();
        int averageSize = queueCount / consumerCount;
        int remainder = queueCount % consumerCount;
        int index = cidAll.get(consumerGroup).indexOf(currentCID);

        for (int i = 0; i < averageSize; i++) {
            result.add(mqAll.get(index * averageSize + i));
        }
        if (remainder > 0 && index < remainder) {
            result.add(mqAll.get(consumerCount * averageSize + index));
        }
        return result;
    }
}
  1. 使用自定义负载均衡策略: 在消费者代码中应用自定义负载均衡策略:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class CustomLoadBalanceConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例,指定消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup1");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("TopicA", "TagA");

        // 设置自定义负载均衡策略
        AllocateMessageQueueStrategy strategy = new CustomAverageAllocateMessageQueueStrategy();
        consumer.setAllocateMessageQueueStrategy(strategy);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Consume from queue: " + msg.getQueueId() + ", message: " + new String(msg.getBody()));
                }
                // 消费成功返回 CONSUME_SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Custom Load Balance Consumer Started.");
    }
}

在上述代码中,我们首先实现了一个自定义的平均分配负载均衡策略 CustomAverageAllocateMessageQueueStrategy,然后在消费者代码中通过 consumer.setAllocateMessageQueueStrategy(strategy) 将其应用到消费者实例上。这样,消费者在进行队列分配时就会按照我们自定义的策略进行。

消息消费的可靠性保证

在 RocketMQ 中,为了确保消息消费的可靠性,采取了多种机制。

消费重试

当消费者消费消息失败时,RocketMQ 会自动进行重试。默认情况下,集群消费模式下,消费失败的消息会被发送到重试队列。重试队列的名称格式为 %RETRY%{consumerGroup}

  1. 重试次数:默认重试 16 次,每次重试的时间间隔会逐渐延长。例如,第一次重试间隔 10 秒,第二次 30 秒,第三次 1 分钟等,具体时间间隔可参考 RocketMQ 的源码实现。
  2. 重试逻辑:消费者在消费消息时返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 表示消费失败需要重试。RocketMQ 会将该消息重新投递到重试队列,等待下一次重试。

死信队列

如果消息经过多次重试后仍然消费失败,该消息会被发送到死信队列。死信队列的名称格式为 %DLQ%{consumerGroup}

  1. 死信队列特点:死信队列中的消息不会再被自动重试消费。应用程序需要人工干预,分析死信原因并处理这些消息。
  2. 处理死信消息:可以通过 RocketMQ 的管理工具或者自定义代码从死信队列中获取消息并进行处理。例如,可以将死信消息发送到另一个主题,由专门的消费者进行特殊处理。

消费进度管理

RocketMQ 会记录消费者的消费进度,以便在消费者重启或者故障恢复后能够继续从上次消费的位置开始消费。

本地存储消费进度

  1. 存储方式:消费者会将消费进度存储在本地文件中,文件路径默认为 $HOME/store/commitlog/consumeOffset.json。该文件记录了每个消费组对每个主题中各个队列的消费进度。
  2. 更新机制:消费者在消费消息成功后,会异步将消费进度更新到本地文件。这样即使消费者突然崩溃,下次启动时也能从本地文件中读取上次的消费进度继续消费。

NameServer 存储消费进度

  1. 存储方式:除了本地存储,RocketMQ 还会将消费进度同步到 NameServer。NameServer 会定期持久化这些消费进度信息。
  2. 作用:当消费者实例在不同机器间迁移或者需要跨实例恢复消费进度时,NameServer 存储的消费进度信息就起到了关键作用。消费者可以从 NameServer 获取最新的消费进度,确保消费的连续性。

高并发消费控制

在高并发场景下,合理控制消息消费的并发度对于系统性能和稳定性至关重要。

并发消费设置

  1. 线程池控制:RocketMQ 的 DefaultMQPushConsumer 内部使用线程池来处理消息消费。可以通过 consumer.setConsumeThreadMin(int min)consumer.setConsumeThreadMax(int max) 方法来设置线程池的最小和最大线程数,从而控制并发消费的程度。
  2. 队列并行消费:由于一个消费者实例可能负责多个消息队列,RocketMQ 支持对每个队列进行并行消费。可以通过设置 consumer.setConsumeMessageBatchMaxSize(int size) 来指定每次从队列中拉取并消费的最大消息数量,提高消费效率。

顺序消费保证

在某些场景下,需要保证消息的顺序消费,比如订单处理场景。RocketMQ 支持顺序消费,通过将相关消息发送到同一个队列,消费者按照队列顺序消费消息即可保证顺序。

  1. 生产者保证:生产者在发送消息时,可以通过 MessageQueueSelector 选择将消息发送到特定队列。例如,根据订单 ID 的哈希值选择队列,保证同一订单的消息都在同一个队列中。
  2. 消费者保证:消费者在消费时,设置 consumer.setMessageListener(new MessageListenerOrderly()),并在监听器中按照顺序处理消息。这样可以确保同一队列中的消息按照发送顺序依次被消费。

总结

RocketMQ 的消息消费模型和负载均衡策略为开发者提供了灵活且强大的消息处理能力。通过深入理解集群消费和广播消费模式,以及合理应用负载均衡策略,开发者可以构建高效、可靠的消息驱动应用程序。同时,消费重试、死信队列、消费进度管理和高并发消费控制等机制进一步保证了消息消费的可靠性和稳定性。在实际应用中,需要根据业务场景的特点,选择合适的消费模型和配置参数,以充分发挥 RocketMQ 的性能优势。

以上就是关于 RocketMQ 消息消费模型与负载均衡策略的详细介绍,希望能帮助开发者更好地使用 RocketMQ 进行后端开发。