MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ事务消息的设计与实现

2024-02-272.3k 阅读

RocketMQ 事务消息概述

在分布式系统中,数据一致性是一个至关重要的问题。许多业务场景需要保证多个操作要么全部成功,要么全部失败,这就涉及到分布式事务。RocketMQ 提供了事务消息功能,以帮助开发者在分布式环境下实现可靠的事务处理。

事务消息允许应用程序将消息发送和本地事务处理进行绑定,确保消息的发送与本地事务的执行具有原子性。如果本地事务执行成功,消息将被正常投递;如果本地事务执行失败,消息将不会被投递或者被回滚。

事务消息的设计理念

RocketMQ 的事务消息设计基于两阶段提交(2PC)的思想,但又针对消息队列的特点进行了优化。

第一阶段:Half 消息

当生产者发送事务消息时,首先会发送一个 Half 消息到 RocketMQ 服务器。Half 消息对消费者是不可见的,它仅代表消息的存储成功,但未确定最终状态。此时,RocketMQ 会返回一个成功的响应给生产者,告知 Half 消息已接收。

第二阶段:事务状态回查

生产者在发送 Half 消息成功后,会执行本地事务。本地事务执行完成后,生产者需要向 RocketMQ 提交事务状态,即事务是提交(COMMIT)还是回滚(ROLLBACK)。如果生产者在一定时间内没有向 RocketMQ 提交事务状态,RocketMQ 会主动回查生产者,询问该事务的状态。

通过这种设计,RocketMQ 确保了即使在生产者出现故障或网络问题的情况下,事务消息的最终状态也能被正确确定,从而保证消息的可靠投递与事务的一致性。

事务消息的实现原理

消息存储结构

RocketMQ 在存储事务消息时,会将 Half 消息和最终的事务状态分开存储。Half 消息存储在 CommitLog 中,与普通消息类似,但会标记其为 Half 消息。事务状态则存储在单独的 ConsumeQueue 中,用于记录每个 Half 消息对应的事务状态。

生产者端实现

  1. 发送 Half 消息 生产者通过 RocketMQ 的客户端 API 发送事务消息时,首先构造一个 Message 对象,并设置其事务相关属性。然后调用 sendMessageInTransaction 方法发送 Half 消息。
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("transaction_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message message = new Message("TransactionTopic", "TagA", "key1", "事务消息内容".getBytes());

        TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null, new LocalTransactionExecuter() {
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                // 执行本地事务
                try {
                    // 模拟本地事务操作
                    System.out.println("执行本地事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
        });

        System.out.printf("发送事务消息结果: %s%n", sendResult);
        producer.shutdown();
    }
}

在上述代码中,sendMessageInTransaction 方法接收三个参数:要发送的消息 message、自定义的事务上下文 arg(这里为 null)以及实现了 LocalTransactionExecuter 接口的本地事务执行器。

  1. 本地事务执行LocalTransactionExecuter 接口的 executeLocalTransactionBranch 方法中,开发者需要编写本地事务的具体逻辑。如代码中模拟了一个简单的本地事务操作,并根据操作结果返回 LocalTransactionState.COMMIT_MESSAGELocalTransactionState.ROLLBACK_MESSAGE

  2. 事务状态回查 如果生产者在一定时间内没有向 RocketMQ 提交事务状态,RocketMQ 会通过回调生产者注册的 TransactionCheckListener 接口来回查事务状态。

import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class TransactionProducerWithCheck {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
        producer.setNamesrvAddr("localhost:9876");

        producer.setTransactionCheckListener(new TransactionCheckListener() {
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                // 回查本地事务状态
                System.out.println("回查本地事务状态,消息: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        Message message = new Message("TransactionTopic", "TagA", "key1", "事务消息内容".getBytes());

        TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null, new LocalTransactionExecuter() {
            @Override
            public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                // 执行本地事务
                try {
                    // 模拟本地事务操作
                    System.out.println("执行本地事务");
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
        });

        System.out.printf("发送事务消息结果: %s%n", sendResult);
        producer.shutdown();
    }
}

checkLocalTransactionState 方法中,开发者需要根据消息的相关信息(如消息体、消息 key 等)查询本地事务的实际状态,并返回相应的 LocalTransactionState

消费者端实现

消费者端对事务消息的处理与普通消息基本相同。当事务消息被成功提交后,它就像普通消息一样被消费者消费。消费者无需关心消息是否是事务消息,只需要按照正常的消费逻辑进行处理即可。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TransactionTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动");
    }
}

上述代码展示了一个简单的事务消息消费者,它订阅了包含事务消息的主题 TransactionTopic,并在接收到消息后进行打印处理。

RocketMQ 事务消息的应用场景

电商订单系统

在电商平台中,当用户下单后,需要同时完成库存扣减和订单创建两个操作。使用 RocketMQ 事务消息可以确保这两个操作要么都成功,要么都失败。

  1. 发送事务消息:订单服务发送事务消息,消息内容包含订单信息。
  2. 本地事务执行:库存服务执行库存扣减操作作为本地事务。如果库存扣减成功,提交事务消息;如果失败,回滚事务消息。
  3. 消息消费:订单服务消费事务消息,如果消息成功接收,创建订单记录。

金融转账系统

在金融转账场景中,涉及到转出账户扣钱和转入账户加钱两个操作。

  1. 发送事务消息:转账发起方发送事务消息,消息包含转账金额、双方账户等信息。
  2. 本地事务执行:转出账户所在服务执行扣钱操作作为本地事务。若扣钱成功,提交事务消息;若失败,回滚事务消息。
  3. 消息消费:转入账户所在服务消费事务消息,根据消息内容执行加钱操作。

事务消息的可靠性与一致性保障

可靠性保障

  1. 消息持久化:RocketMQ 将 Half 消息和事务状态都持久化存储在磁盘上,确保即使服务器重启,消息也不会丢失。
  2. 重试机制:在事务状态回查时,如果生产者由于网络等原因未能及时响应,RocketMQ 会按照一定的策略进行重试,直到获取到最终的事务状态。

一致性保障

  1. 两阶段提交:通过 Half 消息和事务状态回查机制,保证了消息发送与本地事务执行的原子性,从而确保数据的一致性。
  2. 幂等性处理:消费者在消费事务消息时,需要实现幂等性,以防止重复消费导致的数据不一致问题。例如,在处理订单创建消息时,可以根据订单号进行唯一性检查,避免重复创建订单。

事务消息的性能优化

减少回查次数

  1. 优化本地事务执行时间:尽量缩短本地事务的执行时间,确保在 RocketMQ 设定的回查时间内能够提交事务状态。这可以通过优化数据库操作、减少锁的使用等方式实现。
  2. 提高网络稳定性:确保生产者与 RocketMQ 服务器之间的网络稳定,减少因网络波动导致的事务状态提交失败,从而减少回查次数。

批量处理

  1. 批量发送事务消息:在生产者端,可以将多个相关的事务消息合并为一批进行发送,减少发送次数,提高发送效率。
  2. 批量消费事务消息:在消费者端,支持批量消费事务消息,一次性处理多个消息,减少消费的上下文切换开销。

事务消息的注意事项

事务消息不支持顺序消费

由于事务消息的特殊性,RocketMQ 目前不支持事务消息的顺序消费。在设计业务系统时,如果需要保证消息的顺序性,需要考虑其他解决方案。

事务消息的性能开销

相比于普通消息,事务消息由于涉及两阶段提交和事务状态回查等机制,会带来一定的性能开销。在使用事务消息时,需要根据业务场景评估性能影响,确保系统的整体性能满足需求。

幂等性设计

消费者在处理事务消息时,必须实现幂等性。因为在网络波动等情况下,可能会出现消息重复投递的情况。通过幂等性设计,可以保证即使重复消费消息,也不会对业务数据产生影响。

综上所述,RocketMQ 的事务消息为分布式系统中的事务处理提供了可靠的解决方案。通过深入理解其设计与实现原理,合理应用并注意相关事项,可以有效地解决分布式事务中的数据一致性问题,构建出高可靠、高性能的分布式应用。