RocketMQ 如何保障消息的可靠性投递
2022-12-196.2k 阅读
RocketMQ 消息可靠性概述
在分布式系统中,消息的可靠投递至关重要。RocketMQ作为一款高性能、高可靠的消息队列,为保障消息的可靠性投递提供了一系列机制。消息的可靠性投递意味着消息从生产者发送到消息队列,再由消息队列投递到消费者的整个过程中,不丢失、不重复且按照预期的顺序被处理。
RocketMQ 从生产者、消息队列自身以及消费者三个层面来保障消息的可靠性。接下来我们将详细探讨每个层面的具体实现方式。
生产者保障消息可靠性
同步发送与异步发送
- 同步发送
- 同步发送是指生产者发送消息后,会等待 Broker 返回确认响应,只有在收到成功响应后,才会继续执行后续代码。这种方式确保了消息发送的可靠性,因为只有发送成功才会继续执行,但会阻塞当前线程,影响系统的并发性能。
- 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息实例
Message message = new Message("sync_topic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
// 同步发送消息
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
- 异步发送
- 异步发送则是生产者发送消息后,不会等待 Broker 的响应,而是直接返回,通过回调函数来处理发送结果。这样可以提高系统的并发性能,适用于对响应时间要求较高的场景。
- 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
Message message = new Message("async_topic", "TagA", ("Hello RocketMQ " + index).getBytes("UTF-8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功,消息索引:%d,结果:%s%n", index, sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送失败,消息索引:%d,异常:%s%n", index, e);
}
});
}
Thread.sleep(1000 * 5);
producer.shutdown();
}
}
消息重试机制
- 自动重试
- RocketMQ 生产者在发送消息失败时,会自动进行重试。默认情况下,同步发送失败会重试 2 次,异步发送失败会重试 3 次。这一机制大大提高了消息发送成功的概率。
- 例如,当网络抖动等临时故障导致消息发送失败时,生产者会在一定时间间隔后重新尝试发送,直到达到最大重试次数。
- 自定义重试策略
- 开发者也可以根据业务需求自定义重试策略。通过设置
DefaultMQProducer
的retryTimesWhenSendFailed
属性,可以调整同步发送的重试次数;通过设置retryTimesWhenSendAsyncFailed
属性,可以调整异步发送的重试次数。 - 示例代码如下:
- 开发者也可以根据业务需求自定义重试策略。通过设置
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置同步发送失败重试次数为 3 次
producer.setRetryTimesWhenSendFailed(3);
// 设置异步发送失败重试次数为 5 次
producer.setRetryTimesWhenSendAsyncFailed(5);
producer.start();
事务消息
- 事务消息原理
- RocketMQ 提供了事务消息功能,用于确保消息发送与本地业务操作的原子性。其实现原理是将消息发送过程分为两个阶段:半消息发送阶段和消息提交/回滚阶段。
- 在半消息发送阶段,生产者向 Broker 发送半消息(此时消息对消费者不可见),Broker 返回成功响应后,生产者执行本地事务。根据本地事务的执行结果,生产者再向 Broker 发送提交或回滚消息。如果本地事务执行成功,Broker 将半消息标记为可投递状态,消费者可以消费该消息;如果本地事务执行失败,Broker 将删除半消息。
- 代码示例
- 生产者代码:
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地业务逻辑
System.out.println("执行本地事务,消息内容:" + new String(msg.getBody()));
// 模拟本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
// 模拟本地事务失败,返回 LocalTransactionState.ROLLBACK_MESSAGE;
// 模拟本地事务状态未知,返回 LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
System.out.println("检查本地事务,消息内容:" + new String(msg.getBody()));
// 模拟本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
}
};
producer.setTransactionListener(transactionListener);
producer.start();
Message message = new Message("transaction_topic", "TagA", "事务消息测试".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(1000 * 5);
producer.shutdown();
}
}
- 消费者代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("transaction_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费事务消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("事务消费者启动成功");
}
}
消息队列保障消息可靠性
主从架构与数据复制
- 主从架构原理
- RocketMQ 使用主从架构来保障消息的可靠性。每个 Topic 由多个 Message Queue 组成,每个 Message Queue 又分为 Master 节点和 Slave 节点。Master 节点负责处理读写请求,Slave 节点则从 Master 节点同步数据。
- 当 Master 节点出现故障时,Slave 节点可以切换为 Master 节点继续提供服务,从而保证消息的不丢失。
- 数据复制方式
- RocketMQ 支持同步复制和异步复制两种数据复制方式。
- 同步复制:Master 节点在接收到消息后,会等待所有 Slave 节点都成功复制该消息后,才向生产者返回成功响应。这种方式确保了消息在 Master 和 Slave 节点上的一致性,但会增加消息发送的延迟。
- 异步复制:Master 节点在接收到消息后,立即向生产者返回成功响应,然后异步将消息复制到 Slave 节点。这种方式提高了消息发送的性能,但在 Master 节点故障时,可能会丢失少量未复制到 Slave 节点的消息。
- 可以通过在 Broker 配置文件中设置
brokerRole
属性来指定复制方式,例如:
<brokerRole>SYNC_MASTER</brokerRole> <!-- 同步复制 -->
<brokerRole>ASYNC_MASTER</brokerRole> <!-- 异步复制 -->
刷盘机制
- 同步刷盘
- 同步刷盘是指 Broker 在接收到消息后,会立即将消息写入磁盘,只有在写入成功后才向生产者返回成功响应。这种方式确保了消息在 Broker 故障时不会丢失,但会降低消息写入的性能。
- 在 Broker 配置文件中,可以通过设置
flushDiskType
属性为SYNC_FLUSH
来启用同步刷盘:
<flushDiskType>SYNC_FLUSH</flushDiskType>
- 异步刷盘
- 异步刷盘则是 Broker 在接收到消息后,先将消息写入内存的 PageCache,然后由后台线程异步将 PageCache 中的数据刷入磁盘。这种方式提高了消息写入的性能,但在 Broker 突然断电等情况下,可能会丢失少量未刷入磁盘的消息。
- 在 Broker 配置文件中,通过设置
flushDiskType
属性为ASYNC_FLUSH
来启用异步刷盘:
<flushDiskType>ASYNC_FLUSH</flushDiskType>
高可用机制
- Namesrv 高可用
- RocketMQ 的 NameServer 是一个轻量级的注册中心,采用无状态的集群部署方式。每个 NameServer 节点之间相互独立,不进行数据同步。生产者和消费者通过轮询的方式从 NameServer 集群中获取 Broker 的地址信息。
- 当某个 NameServer 节点出现故障时,生产者和消费者可以自动切换到其他可用的 NameServer 节点,从而保证系统的可用性。
- Broker 高可用
- 如前文所述,Broker 通过主从架构和数据复制来实现高可用。此外,RocketMQ 还支持多 Broker 集群部署,通过合理的负载均衡策略,将消息请求均匀分配到各个 Broker 节点上,避免单个 Broker 节点压力过大。
- 当某个 Broker 节点出现故障时,生产者可以自动感知并将消息发送到其他可用的 Broker 节点,消费者也可以从其他 Broker 节点获取消息进行消费,确保消息的正常投递和消费。
消费者保障消息可靠性
消费模式
- 集群消费
- 集群消费模式下,多个消费者实例共同消费一个 Topic 的消息。每个消费者实例只消费消息的一部分,通过负载均衡算法将消息分配到各个消费者实例上。
- 这种模式适用于处理大规模消息的场景,提高了消费的并行度。例如,在电商订单处理系统中,多个消费者可以同时处理不同的订单消息,加快订单处理速度。
- 代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ClusterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("cluster_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("集群消费消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("集群消费者启动成功");
}
}
- 广播消费
- 广播消费模式下,每个消费者实例都会接收到 Topic 的所有消息。这种模式适用于需要所有消费者都处理相同消息的场景,例如系统配置更新通知等。
- 代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("broadcast_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("广播消费消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("广播消费者启动成功");
}
}
消费重试
- 自动重试
- 当消费者消费消息失败时,RocketMQ 会自动进行重试。默认情况下,消费者会重试 16 次,每次重试的时间间隔会逐渐延长。这一机制确保了因临时故障导致的消费失败能够得到处理。
- 例如,当消费者处理消息时依赖的外部服务暂时不可用,导致消费失败,通过重试机制,消费者可以在一定时间后再次尝试消费,提高消息消费的成功率。
- 死信队列
- 如果消息经过多次重试后仍然消费失败,RocketMQ 会将该消息发送到死信队列(DLQ,Dead - Letter Queue)。死信队列是一个特殊的 Topic,用于存储消费失败的消息。
- 开发者可以通过监听死信队列,对这些消费失败的消息进行人工干预,例如检查消息内容、调整消费逻辑等,以确保消息最终能够被成功消费。
- 示例代码如下,首先创建死信队列消费者:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DeadLetterQueueConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead_letter_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
// 订阅死信队列 Topic,死信队列 Topic 名称为 %DLQ% + 原消费组名称
consumer.subscribe("%DLQ%cluster_consumer_group", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费死信队列消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("死信队列消费者启动成功");
}
}
幂等性处理
- 幂等性概念
- 幂等性是指对同一操作的多次请求应该产生相同的结果。在消息消费场景中,由于网络波动等原因,可能会出现消息重复消费的情况。为了确保业务数据的一致性,消费者需要具备幂等性处理能力。
- 实现方式
- 数据库唯一约束:在数据库表中设置唯一索引,当消费者处理消息时,将消息中的关键信息作为唯一索引的字段。例如,在订单处理场景中,可以将订单号作为唯一索引字段。当重复消费相同订单消息时,数据库插入操作会因为唯一约束而失败,从而避免重复处理订单。
- 状态机控制:维护业务对象的状态机,只有在特定状态下才处理消息。例如,订单状态为“未支付”时,才处理支付消息;当订单状态已经是“已支付”时,忽略重复的支付消息。
- 消息唯一标识:在消息中添加唯一标识,消费者在处理消息前,先检查该唯一标识是否已经处理过。可以使用 UUID 等方式生成唯一标识,将其存储在缓存中(如 Redis),每次处理消息时查询缓存,若已存在则表明是重复消息,不再处理。
通过以上从生产者、消息队列和消费者三个层面的一系列机制,RocketMQ 能够有效地保障消息的可靠性投递,满足各种复杂分布式系统的消息传递需求。在实际应用中,开发者需要根据具体业务场景,合理配置和使用这些机制,以实现高效、可靠的消息处理。