MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ事务消息原理与应用

2021-01-113.0k 阅读

1. RocketMQ事务消息概述

在分布式系统中,常常会涉及到多个操作的原子性,例如在电商场景下,下单操作既要在订单库中插入订单记录,又要在库存库中扣减库存,这两个操作必须要么都成功,要么都失败。传统数据库通过本地事务能很好地保证单机操作的原子性,但在分布式环境下就需要引入分布式事务解决方案。RocketMQ的事务消息就是其中一种有效的手段。

RocketMQ的事务消息可以理解为一种特殊的消息,它允许消息发送者在本地事务执行前先向RocketMQ发送一条半消息(Half Message)。半消息意味着该消息对消费者是不可见的。当发送者本地事务执行完成后,再根据事务执行结果向RocketMQ发送Commit或Rollback指令,以决定这条半消息是最终提交给消费者,还是回滚删除。

2. RocketMQ事务消息原理

2.1 事务消息发送流程

  1. 发送半消息:消息发送者向RocketMQ发送半消息,RocketMQ接收到半消息后,将其存储到特殊的队列中(这个队列对消费者不可见),并返回一个发送成功的响应给发送者。此时消息处于“Prepared”状态。
  2. 执行本地事务:消息发送者在接收到半消息发送成功的响应后,执行本地事务。本地事务的执行逻辑由业务代码决定,例如可能是数据库的增删改操作等。
  3. 提交或回滚事务:根据本地事务的执行结果,消息发送者向RocketMQ发送Commit或Rollback指令。如果本地事务执行成功,发送Commit指令,RocketMQ将半消息从特殊队列转移到正常队列,使其对消费者可见;如果本地事务执行失败,发送Rollback指令,RocketMQ将删除半消息。

2.2 事务状态回查机制

在实际应用中,可能会出现发送者发送Commit或Rollback指令失败的情况(例如网络异常等)。为了确保事务消息的最终一致性,RocketMQ引入了事务状态回查机制。

  1. 回查触发:RocketMQ会定期扫描处于“Prepared”状态时间超过一定阈值的半消息,并向消息发送者发起事务状态回查请求。
  2. 回查处理:消息发送者接收到回查请求后,根据本地事务的实际执行状态,再次向RocketMQ发送Commit或Rollback指令。这样,即使之前的提交或回滚指令丢失,通过回查机制也能保证事务消息的最终状态正确。

3. RocketMQ事务消息应用场景

3.1 电商订单场景

  1. 下单与库存扣减:当用户下单时,首先发送一条事务消息,其中包含订单信息和库存扣减信息。发送半消息成功后,在本地数据库插入订单记录,然后执行库存扣减操作。如果库存扣减成功,提交事务消息,消费者就能收到订单消息并进行后续处理(如发货等);如果库存扣减失败,回滚事务消息,订单不会生效,库存也不会被误扣。
  2. 防止重复下单:通过事务消息的特性,在插入订单记录前发送半消息,若重复下单,半消息会因为幂等性不会重复发送成功,从而避免重复订单。

3.2 银行转账场景

在银行系统中,A账户向B账户转账。发送包含转账信息的事务消息,发送半消息成功后,在本地数据库执行A账户扣款操作,若扣款成功,提交事务消息,B账户收到转账消息后进行入账操作;若扣款失败,回滚事务消息,转账不会生效。

4. RocketMQ事务消息代码示例

以下以Java语言为例,展示如何使用RocketMQ发送和接收事务消息。

4.1 引入依赖

在Maven项目的pom.xml文件中添加RocketMQ相关依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

4.2 发送事务消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.transaction.LocalTransactionState;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;
import java.util.concurrent.*;

public class TransactionMsgProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 设置线程池来处理本地事务和事务状态回查
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionExecuter(new LocalTransactionExecuter() {
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                // 执行本地事务逻辑
                System.out.println("执行本地事务,消息内容:" + new String(msg.getBody()));
                // 模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
                // 模拟本地事务失败,返回LocalTransactionState.ROLLBACK_MESSAGE
            }
        });

        producer.start();

        // 发送事务消息
        Message message = new Message("TransactionTopic", "TagA", "KEY1", "事务消息内容".getBytes(RemotingHelper.DEFAULT_CHARSET));
        TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
        System.out.println("发送事务消息结果:" + sendResult);

        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

4.3 接收事务消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionMsgConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TransactionTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到事务消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("事务消息消费者启动成功");
    }
}

在上述代码中,发送端通过TransactionMQProducer发送事务消息,在executeLocalTransactionBranch方法中模拟了本地事务的执行。接收端通过DefaultMQPushConsumer接收事务消息并进行处理。

5. RocketMQ事务消息使用注意事项

  1. 本地事务的幂等性:由于可能会收到事务状态回查请求,本地事务的执行逻辑必须保证幂等性,即多次执行相同的本地事务操作,结果应该是一致的。例如在扣库存操作中,需要防止重复扣减。
  2. 回查性能:事务状态回查机制虽然保证了最终一致性,但频繁的回查可能会对系统性能产生影响。因此,需要合理设置回查阈值,并且优化回查处理逻辑。
  3. 消息顺序性:在事务消息场景下,如果需要保证消息的顺序性,要注意在发送和接收端进行相应的配置和处理。例如在发送端使用有序消息的发送方式,在接收端使用顺序消费的模式。
  4. 异常处理:在发送半消息、执行本地事务、提交或回滚事务以及事务状态回查等各个环节,都可能出现异常。需要在代码中妥善处理这些异常情况,确保系统的稳定性和可靠性。

6. 事务消息与其他分布式事务方案对比

  1. XA 事务:XA 事务是一种基于数据库的分布式事务解决方案,它通过数据库的两阶段提交(2PC)协议来保证事务的一致性。与 RocketMQ 事务消息相比,XA 事务依赖于数据库的支持,对数据库性能影响较大,并且在跨数据库、跨服务场景下灵活性较差。而 RocketMQ 事务消息基于消息队列,适用于更广泛的分布式场景,对业务侵入性相对较小。
  2. TCC(Try - Confirm - Cancel)模式:TCC 模式是一种应用层的分布式事务解决方案,它将事务分为 Try、Confirm 和 Cancel 三个阶段。与 RocketMQ 事务消息相比,TCC 模式对业务代码的侵入性较大,需要在业务逻辑中实现 Try、Confirm 和 Cancel 三个操作。而 RocketMQ 事务消息主要在消息发送和消费环节处理事务,对业务代码的侵入相对集中在消息发送端的本地事务执行部分。
  3. Saga 模式:Saga 模式是通过一系列本地事务的顺序执行来实现分布式事务,当其中某个本地事务失败时,通过补偿事务来撤销之前已经执行的事务。RocketMQ 事务消息主要针对单个消息发送和本地事务的一致性,而 Saga 模式更适合处理长流程、多步骤的分布式事务场景。但 Saga 模式的协调和补偿逻辑相对复杂,而 RocketMQ 事务消息相对简单直观,适用于对一致性要求较高且业务逻辑相对集中的场景。

7. RocketMQ事务消息在复杂业务中的优化策略

  1. 批量事务消息:在一些业务场景中,如果存在多个相关的本地事务操作,可以考虑使用批量事务消息。例如在电商的批量下单场景下,一次发送包含多个订单信息的事务消息,在本地事务中批量处理订单插入和库存扣减等操作。这样可以减少消息发送次数,提高系统性能。但需要注意的是,批量事务消息的本地事务执行逻辑要保证原子性,并且在处理回查和异常情况时要更加谨慎。
  2. 异步处理本地事务:在发送半消息成功后,可以将本地事务的执行放到一个异步线程池中进行处理。这样可以避免发送端因为本地事务执行时间过长而阻塞,提高系统的并发处理能力。同时,要确保异步处理本地事务时的线程安全性和数据一致性。
  3. 优化回查逻辑:由于事务状态回查可能会对系统性能产生一定影响,可以通过优化回查逻辑来降低这种影响。例如在回查时,可以先通过缓存等方式快速判断本地事务的状态,减少对数据库等持久化存储的查询次数。并且可以根据业务特点,合理调整回查的频率和阈值。
  4. 结合其他技术:在复杂业务中,可以将 RocketMQ 事务消息与其他技术相结合。例如结合分布式缓存来提高数据访问速度,结合分布式日志来记录事务执行过程,便于问题排查和系统监控。同时,也可以结合分布式锁来保证在高并发场景下本地事务的原子性和一致性。

8. RocketMQ事务消息在微服务架构中的应用实践

在微服务架构中,各个微服务之间通过接口进行通信,数据的一致性问题变得更加复杂。RocketMQ 事务消息可以有效地解决微服务之间的分布式事务问题。

  1. 订单微服务与库存微服务:当用户在订单微服务下单时,订单微服务发送包含订单和库存扣减信息的事务消息。在本地事务中,订单微服务插入订单记录。如果本地事务成功,提交事务消息,库存微服务接收到消息后扣减库存。如果订单微服务本地事务失败,回滚事务消息,库存微服务不会收到扣减库存的消息。
  2. 跨微服务的复杂业务流程:例如在一个电商促销活动中,涉及到订单微服务、库存微服务、积分微服务和营销微服务等多个微服务。订单微服务发送事务消息,在本地事务中处理订单相关操作,然后根据事务结果提交或回滚消息。其他微服务根据接收到的消息执行相应的积分增加、营销活动记录等操作。通过 RocketMQ 事务消息,可以保证整个复杂业务流程中各个微服务操作的一致性。

在实际应用中,需要注意各个微服务之间的接口兼容性和数据格式一致性,同时要合理配置 RocketMQ 的参数,以满足微服务架构下高并发、低延迟的要求。

9. RocketMQ事务消息的监控与运维

  1. 消息状态监控:通过 RocketMQ 提供的控制台或自定义监控工具,实时监控事务消息的发送、提交、回滚以及回查等状态。例如可以统计不同状态的事务消息数量,及时发现异常情况,如大量处于“Prepared”状态且长时间未处理的消息,可能意味着本地事务执行或回查出现问题。
  2. 性能监控:监控事务消息的发送和处理性能指标,如消息发送延迟、本地事务执行时间、回查响应时间等。通过性能监控,可以及时发现性能瓶颈,例如本地事务执行时间过长导致消息发送延迟,从而针对性地进行优化。
  3. 异常处理与报警:在系统中设置异常处理机制,当事务消息发送、本地事务执行或回查等过程中出现异常时,及时记录异常信息,并通过邮件、短信等方式发送报警通知给运维人员。运维人员可以根据异常信息快速定位问题并进行修复。
  4. 数据备份与恢复:定期对 RocketMQ 存储的事务消息数据进行备份,以防数据丢失。在出现数据丢失或异常情况时,能够通过备份数据进行恢复,保证系统的可用性和数据一致性。同时,要定期进行数据恢复演练,确保备份数据的有效性和恢复流程的正确性。