MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 消息幂等性的实现方法

2022-08-065.8k 阅读

RocketMQ 消息幂等性基础概念

在深入探讨 RocketMQ 消息幂等性的实现方法之前,我们先来清晰地理解什么是消息幂等性。在消息队列的使用场景中,幂等性指的是无论一条消息被重复发送了多少次,其对系统产生的最终影响应该是一致的。例如,在电商系统中,如果一条创建订单的消息由于网络波动等原因被重复发送,幂等性确保该订单只会被创建一次,而不会出现重复创建订单的情况。

对于 RocketMQ 而言,由于其异步、高并发的特性,消息重复发送是可能出现的。这可能是由于网络抖动、生产者重试机制、消费者端处理超时等多种原因导致。如果业务系统不能正确处理消息重复,可能会引发数据不一致、业务逻辑错误等问题。因此,实现消息幂等性对于保障业务系统的正确性和稳定性至关重要。

RocketMQ 自身特性对幂等性的支持

RocketMQ 在设计上提供了一些特性来辅助实现消息幂等性。

消息唯一标识

RocketMQ 为每条消息提供了唯一的标识,即 MessageID。MessageID 是在消息发送到 Broker 时由 Broker 生成的,具有全局唯一性。然而,MessageID 对于业务开发者来说,直接用于幂等性判断存在一定局限性。因为在某些情况下,例如消息被重新发送时,MessageID 可能会改变。这是因为重新发送的消息在 Broker 端会被当作新的消息处理,从而生成新的 MessageID。所以,单纯依靠 MessageID 并不能完全满足幂等性判断的需求。

事务消息

RocketMQ 支持事务消息,这为幂等性的实现提供了有力支持。事务消息允许生产者在发送消息的同时,执行本地事务。只有当本地事务执行成功后,消息才会真正被投递到消费者端。如果本地事务执行失败,消息将不会被投递,从而避免了因消息重复投递导致的业务重复执行问题。例如,在电商下单场景中,生产者在发送创建订单消息前,先执行创建订单的本地数据库事务。如果事务成功,再将消息发送出去。这样即使消息重复发送,由于订单已经在本地事务中创建成功,再次处理时也不会产生重复创建订单的情况。

基于业务唯一标识实现幂等性

业务唯一标识的选择

在实际业务场景中,我们需要选择合适的业务唯一标识来实现幂等性。这个标识应该能够唯一地标识一个业务操作,例如在订单系统中,订单号就是一个很好的业务唯一标识。因为每个订单都有唯一的订单号,通过订单号我们可以准确地判断该订单相关的操作是否已经执行过。

代码示例(Java)

下面我们通过一个简单的 Java 代码示例,展示如何基于业务唯一标识实现幂等性。假设我们有一个处理订单支付的业务逻辑,订单号作为业务唯一标识。

首先,定义订单支付的业务逻辑类:

public class OrderPaymentService {
    private Map<String, Boolean> paymentRecord = new HashMap<>();

    public boolean processPayment(String orderId) {
        // 检查是否已经处理过该订单支付
        if (paymentRecord.containsKey(orderId)) {
            return true;
        }
        // 模拟支付处理逻辑
        boolean paymentResult = performPayment(orderId);
        if (paymentResult) {
            paymentRecord.put(orderId, true);
        }
        return paymentResult;
    }

    private boolean performPayment(String orderId) {
        // 实际的支付操作,例如调用支付接口等
        System.out.println("Processing payment for order: " + orderId);
        // 这里简单返回 true 表示支付成功
        return true;
    }
}

然后,在 RocketMQ 的消费者端使用这个业务逻辑类来处理消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderPaymentConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_payment_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("order_payment_topic", "*");

        OrderPaymentService paymentService = new OrderPaymentService();

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String orderId = new String(msg.getBody());
                    boolean result = paymentService.processPayment(orderId);
                    if (result) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        consumer.start();
        System.out.println("Order payment consumer started successfully.");
    }
}

在上述代码中,OrderPaymentService类维护了一个paymentRecord来记录已经处理过的订单支付情况。processPayment方法在处理支付前先检查订单号是否已经在记录中,如果存在则直接返回成功,实现了幂等性。OrderPaymentConsumer类作为 RocketMQ 的消费者,从order_payment_topic主题接收消息,消息内容为订单号,然后调用OrderPaymentServiceprocessPayment方法处理支付。

基于数据库实现幂等性

数据库唯一约束

利用数据库的唯一约束特性是实现幂等性的常用方法之一。例如,在数据库表中为某个业务字段设置唯一索引。以订单支付为例,我们可以在支付记录表中为订单号字段设置唯一索引。当处理支付消息时,尝试向支付记录表中插入一条记录,如果插入成功,说明该订单支付操作是首次执行;如果因为唯一索引冲突插入失败,说明该订单支付已经处理过,无需再次处理。

代码示例(Java + MySQL)

假设我们使用 MySQL 数据库,首先创建支付记录表:

CREATE TABLE order_payment (
    id INT AUTO_INCREMENT PRIMARY KEY,
    order_id VARCHAR(255) NOT NULL UNIQUE,
    payment_status VARCHAR(50),
    payment_time TIMESTAMP
);

然后,编写 Java 代码来处理支付消息并利用数据库唯一约束实现幂等性:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class OrderPaymentDBService {
    private static final String INSERT_PAYMENT_SQL = "INSERT INTO order_payment (order_id, payment_status, payment_time) VALUES (?,?, NOW())";

    public boolean processPayment(String orderId) {
        try (Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/yourdatabase", "username", "password");
             PreparedStatement statement = connection.prepareStatement(INSERT_PAYMENT_SQL)) {
            statement.setString(1, orderId);
            statement.setString(2, "PAID");
            int rowsInserted = statement.executeUpdate();
            return rowsInserted > 0;
        } catch (SQLException e) {
            if (e.getSQLState().equals("23000")) {
                // 唯一索引冲突,说明已经处理过
                return true;
            }
            e.printStackTrace();
            return false;
        }
    }
}

在 RocketMQ 消费者端使用这个服务:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderPaymentDBConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_payment_db_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("order_payment_topic", "*");

        OrderPaymentDBService paymentService = new OrderPaymentDBService();

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String orderId = new String(msg.getBody());
                    boolean result = paymentService.processPayment(orderId);
                    if (result) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        consumer.start();
        System.out.println("Order payment database consumer started successfully.");
    }
}

在上述代码中,OrderPaymentDBService类通过执行 SQL 插入语句来处理订单支付。如果插入因为唯一索引冲突失败,捕获SQLException并根据 SQL 状态码判断为已处理过,返回成功。OrderPaymentDBConsumer类作为 RocketMQ 消费者,调用OrderPaymentDBServiceprocessPayment方法处理消息。

基于 Redis 实现幂等性

Redis 的原子操作

Redis 提供了丰富的原子操作,如SETNX(Set if Not eXists),可以有效地用于实现幂等性。SETNX命令用于在指定的键不存在时,为键设置指定的值。利用这个特性,我们可以将业务唯一标识作为 Redis 键,当处理消息时,使用SETNX尝试设置键值。如果设置成功,说明该业务操作是首次执行;如果设置失败,说明已经处理过。

代码示例(Java + Jedis)

假设我们使用 Jedis 来操作 Redis,首先引入 Jedis 依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.0</version>
</dependency>

然后编写实现幂等性的代码:

import redis.clients.jedis.Jedis;

public class OrderPaymentRedisService {
    private Jedis jedis;

    public OrderPaymentRedisService() {
        jedis = new Jedis("localhost", 6379);
    }

    public boolean processPayment(String orderId) {
        Long result = jedis.setnx("order_payment:" + orderId, "processed");
        if (result == 1) {
            // 设置成功,说明首次处理
            return performPayment(orderId);
        }
        return true;
    }

    private boolean performPayment(String orderId) {
        // 实际的支付操作,例如调用支付接口等
        System.out.println("Processing payment for order: " + orderId);
        // 这里简单返回 true 表示支付成功
        return true;
    }
}

在 RocketMQ 消费者端使用这个服务:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderPaymentRedisConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_payment_redis_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("order_payment_topic", "*");

        OrderPaymentRedisService paymentService = new OrderPaymentRedisService();

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String orderId = new String(msg.getBody());
                    boolean result = paymentService.processPayment(orderId);
                    if (result) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        consumer.start();
        System.out.println("Order payment Redis consumer started successfully.");
    }
}

在上述代码中,OrderPaymentRedisService类通过jedis.setnx方法尝试设置 Redis 键。如果设置成功,调用实际的支付方法;如果设置失败,说明已经处理过,直接返回成功。OrderPaymentRedisConsumer类作为 RocketMQ 消费者,调用OrderPaymentRedisServiceprocessPayment方法处理消息。

实现幂等性时的注意事项

性能与资源消耗

在选择幂等性实现方法时,需要考虑性能和资源消耗。例如,基于数据库唯一约束的方法虽然简单直接,但每次插入操作都可能涉及数据库的 I/O 操作,在高并发场景下可能会成为性能瓶颈。而基于 Redis 的方法,由于 Redis 是内存数据库,性能较高,但需要额外的 Redis 资源。因此,需要根据实际业务场景的并发量、性能要求等因素综合选择合适的实现方法。

分布式环境下的一致性

在分布式系统中,实现幂等性还需要考虑一致性问题。例如,在基于 Redis 实现幂等性时,如果 Redis 采用集群部署,可能会存在数据同步延迟的情况。在这种情况下,可能会出现短暂的不一致,导致幂等性判断出现偏差。为了解决这个问题,可以采用 Redis 单节点部署或者使用 Redis 提供的分布式锁等机制来确保数据的一致性。

消息处理顺序

在某些业务场景中,消息处理顺序可能会影响幂等性的实现。例如,在一个库存扣减的场景中,如果先处理了库存增加的消息,后处理库存减少的消息,可能会导致库存数据异常。因此,在设计幂等性方案时,需要考虑消息处理顺序,并结合业务逻辑确保无论消息顺序如何,最终的业务结果都是正确的。

总结不同实现方法的适用场景

基于业务唯一标识的内存记录

适用于业务逻辑相对简单、单机部署或者并发量不高的场景。这种方法实现简单,不需要额外的外部存储,但在分布式环境下需要考虑内存数据的同步问题。

基于数据库唯一约束

适用于对数据一致性要求较高,对性能要求不是特别苛刻的场景。由于数据库操作相对较慢,不适合高并发场景,但能保证数据的强一致性。

基于 Redis 实现幂等性

适用于高并发场景,对性能要求较高。Redis 的高性能和丰富的原子操作使其能够快速处理幂等性判断,但需要注意分布式环境下的一致性问题。

通过深入理解 RocketMQ 消息幂等性的概念,并结合不同的实现方法及其适用场景,开发者可以根据具体业务需求选择最合适的幂等性实现方案,确保业务系统在面对消息重复时的稳定性和正确性。在实际应用中,还需要不断地进行性能测试和优化,以满足业务发展的需求。同时,随着业务的不断变化和系统规模的扩大,幂等性实现方案也可能需要不断地调整和完善。