MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 如何保障消息的可靠性投递

2022-12-196.2k 阅读

RocketMQ 消息可靠性概述

在分布式系统中,消息的可靠投递至关重要。RocketMQ作为一款高性能、高可靠的消息队列,为保障消息的可靠性投递提供了一系列机制。消息的可靠性投递意味着消息从生产者发送到消息队列,再由消息队列投递到消费者的整个过程中,不丢失、不重复且按照预期的顺序被处理。

RocketMQ 从生产者、消息队列自身以及消费者三个层面来保障消息的可靠性。接下来我们将详细探讨每个层面的具体实现方式。

生产者保障消息可靠性

同步发送与异步发送

  1. 同步发送
    • 同步发送是指生产者发送消息后,会等待 Broker 返回确认响应,只有在收到成功响应后,才会继续执行后续代码。这种方式确保了消息发送的可靠性,因为只有发送成功才会继续执行,但会阻塞当前线程,影响系统的并发性能。
    • 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例
            Message message = new Message("sync_topic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
            // 同步发送消息
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}
  1. 异步发送
    • 异步发送则是生产者发送消息后,不会等待 Broker 的响应,而是直接返回,通过回调函数来处理发送结果。这样可以提高系统的并发性能,适用于对响应时间要求较高的场景。
    • 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            Message message = new Message("async_topic", "TagA", ("Hello RocketMQ " + index).getBytes("UTF-8"));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("异步发送成功,消息索引:%d,结果:%s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("异步发送失败,消息索引:%d,异常:%s%n", index, e);
                }
            });
        }

        Thread.sleep(1000 * 5);
        producer.shutdown();
    }
}

消息重试机制

  1. 自动重试
    • RocketMQ 生产者在发送消息失败时,会自动进行重试。默认情况下,同步发送失败会重试 2 次,异步发送失败会重试 3 次。这一机制大大提高了消息发送成功的概率。
    • 例如,当网络抖动等临时故障导致消息发送失败时,生产者会在一定时间间隔后重新尝试发送,直到达到最大重试次数。
  2. 自定义重试策略
    • 开发者也可以根据业务需求自定义重试策略。通过设置 DefaultMQProducerretryTimesWhenSendFailed 属性,可以调整同步发送的重试次数;通过设置 retryTimesWhenSendAsyncFailed 属性,可以调整异步发送的重试次数。
    • 示例代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置同步发送失败重试次数为 3 次
producer.setRetryTimesWhenSendFailed(3);
// 设置异步发送失败重试次数为 5 次
producer.setRetryTimesWhenSendAsyncFailed(5);
producer.start();

事务消息

  1. 事务消息原理
    • RocketMQ 提供了事务消息功能,用于确保消息发送与本地业务操作的原子性。其实现原理是将消息发送过程分为两个阶段:半消息发送阶段和消息提交/回滚阶段。
    • 在半消息发送阶段,生产者向 Broker 发送半消息(此时消息对消费者不可见),Broker 返回成功响应后,生产者执行本地事务。根据本地事务的执行结果,生产者再向 Broker 发送提交或回滚消息。如果本地事务执行成功,Broker 将半消息标记为可投递状态,消费者可以消费该消息;如果本地事务执行失败,Broker 将删除半消息。
  2. 代码示例
    • 生产者代码:
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地业务逻辑
                System.out.println("执行本地事务,消息内容:" + new String(msg.getBody()));
                // 模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
                // 模拟本地事务失败,返回 LocalTransactionState.ROLLBACK_MESSAGE;
                // 模拟本地事务状态未知,返回 LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 检查本地事务状态
                System.out.println("检查本地事务,消息内容:" + new String(msg.getBody()));
                // 模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        producer.setTransactionListener(transactionListener);
        producer.start();

        Message message = new Message("transaction_topic", "TagA", "事务消息测试".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(message, null);
        System.out.printf("%s%n", sendResult);

        Thread.sleep(1000 * 5);
        producer.shutdown();
    }
}
- 消费者代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("transaction_topic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消费事务消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("事务消费者启动成功");
    }
}

消息队列保障消息可靠性

主从架构与数据复制

  1. 主从架构原理
    • RocketMQ 使用主从架构来保障消息的可靠性。每个 Topic 由多个 Message Queue 组成,每个 Message Queue 又分为 Master 节点和 Slave 节点。Master 节点负责处理读写请求,Slave 节点则从 Master 节点同步数据。
    • 当 Master 节点出现故障时,Slave 节点可以切换为 Master 节点继续提供服务,从而保证消息的不丢失。
  2. 数据复制方式
    • RocketMQ 支持同步复制和异步复制两种数据复制方式。
    • 同步复制:Master 节点在接收到消息后,会等待所有 Slave 节点都成功复制该消息后,才向生产者返回成功响应。这种方式确保了消息在 Master 和 Slave 节点上的一致性,但会增加消息发送的延迟。
    • 异步复制:Master 节点在接收到消息后,立即向生产者返回成功响应,然后异步将消息复制到 Slave 节点。这种方式提高了消息发送的性能,但在 Master 节点故障时,可能会丢失少量未复制到 Slave 节点的消息。
    • 可以通过在 Broker 配置文件中设置 brokerRole 属性来指定复制方式,例如:
<brokerRole>SYNC_MASTER</brokerRole>  <!-- 同步复制 -->
<brokerRole>ASYNC_MASTER</brokerRole> <!-- 异步复制 -->

刷盘机制

  1. 同步刷盘
    • 同步刷盘是指 Broker 在接收到消息后,会立即将消息写入磁盘,只有在写入成功后才向生产者返回成功响应。这种方式确保了消息在 Broker 故障时不会丢失,但会降低消息写入的性能。
    • 在 Broker 配置文件中,可以通过设置 flushDiskType 属性为 SYNC_FLUSH 来启用同步刷盘:
<flushDiskType>SYNC_FLUSH</flushDiskType>
  1. 异步刷盘
    • 异步刷盘则是 Broker 在接收到消息后,先将消息写入内存的 PageCache,然后由后台线程异步将 PageCache 中的数据刷入磁盘。这种方式提高了消息写入的性能,但在 Broker 突然断电等情况下,可能会丢失少量未刷入磁盘的消息。
    • 在 Broker 配置文件中,通过设置 flushDiskType 属性为 ASYNC_FLUSH 来启用异步刷盘:
<flushDiskType>ASYNC_FLUSH</flushDiskType>

高可用机制

  1. Namesrv 高可用
    • RocketMQ 的 NameServer 是一个轻量级的注册中心,采用无状态的集群部署方式。每个 NameServer 节点之间相互独立,不进行数据同步。生产者和消费者通过轮询的方式从 NameServer 集群中获取 Broker 的地址信息。
    • 当某个 NameServer 节点出现故障时,生产者和消费者可以自动切换到其他可用的 NameServer 节点,从而保证系统的可用性。
  2. Broker 高可用
    • 如前文所述,Broker 通过主从架构和数据复制来实现高可用。此外,RocketMQ 还支持多 Broker 集群部署,通过合理的负载均衡策略,将消息请求均匀分配到各个 Broker 节点上,避免单个 Broker 节点压力过大。
    • 当某个 Broker 节点出现故障时,生产者可以自动感知并将消息发送到其他可用的 Broker 节点,消费者也可以从其他 Broker 节点获取消息进行消费,确保消息的正常投递和消费。

消费者保障消息可靠性

消费模式

  1. 集群消费
    • 集群消费模式下,多个消费者实例共同消费一个 Topic 的消息。每个消费者实例只消费消息的一部分,通过负载均衡算法将消息分配到各个消费者实例上。
    • 这种模式适用于处理大规模消息的场景,提高了消费的并行度。例如,在电商订单处理系统中,多个消费者可以同时处理不同的订单消息,加快订单处理速度。
    • 代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ClusterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("cluster_topic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("集群消费消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("集群消费者启动成功");
    }
}
  1. 广播消费
    • 广播消费模式下,每个消费者实例都会接收到 Topic 的所有消息。这种模式适用于需要所有消费者都处理相同消息的场景,例如系统配置更新通知等。
    • 代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.subscribe("broadcast_topic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("广播消费消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("广播消费者启动成功");
    }
}

消费重试

  1. 自动重试
    • 当消费者消费消息失败时,RocketMQ 会自动进行重试。默认情况下,消费者会重试 16 次,每次重试的时间间隔会逐渐延长。这一机制确保了因临时故障导致的消费失败能够得到处理。
    • 例如,当消费者处理消息时依赖的外部服务暂时不可用,导致消费失败,通过重试机制,消费者可以在一定时间后再次尝试消费,提高消息消费的成功率。
  2. 死信队列
    • 如果消息经过多次重试后仍然消费失败,RocketMQ 会将该消息发送到死信队列(DLQ,Dead - Letter Queue)。死信队列是一个特殊的 Topic,用于存储消费失败的消息。
    • 开发者可以通过监听死信队列,对这些消费失败的消息进行人工干预,例如检查消息内容、调整消费逻辑等,以确保消息最终能够被成功消费。
    • 示例代码如下,首先创建死信队列消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DeadLetterQueueConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead_letter_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅死信队列 Topic,死信队列 Topic 名称为 %DLQ% + 原消费组名称
        consumer.subscribe("%DLQ%cluster_consumer_group", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消费死信队列消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("死信队列消费者启动成功");
    }
}

幂等性处理

  1. 幂等性概念
    • 幂等性是指对同一操作的多次请求应该产生相同的结果。在消息消费场景中,由于网络波动等原因,可能会出现消息重复消费的情况。为了确保业务数据的一致性,消费者需要具备幂等性处理能力。
  2. 实现方式
    • 数据库唯一约束:在数据库表中设置唯一索引,当消费者处理消息时,将消息中的关键信息作为唯一索引的字段。例如,在订单处理场景中,可以将订单号作为唯一索引字段。当重复消费相同订单消息时,数据库插入操作会因为唯一约束而失败,从而避免重复处理订单。
    • 状态机控制:维护业务对象的状态机,只有在特定状态下才处理消息。例如,订单状态为“未支付”时,才处理支付消息;当订单状态已经是“已支付”时,忽略重复的支付消息。
    • 消息唯一标识:在消息中添加唯一标识,消费者在处理消息前,先检查该唯一标识是否已经处理过。可以使用 UUID 等方式生成唯一标识,将其存储在缓存中(如 Redis),每次处理消息时查询缓存,若已存在则表明是重复消息,不再处理。

通过以上从生产者、消息队列和消费者三个层面的一系列机制,RocketMQ 能够有效地保障消息的可靠性投递,满足各种复杂分布式系统的消息传递需求。在实际应用中,开发者需要根据具体业务场景,合理配置和使用这些机制,以实现高效、可靠的消息处理。