MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ顺序消息与事务消息实践

2022-07-054.5k 阅读

RocketMQ顺序消息

顺序消息概念

在很多业务场景中,消息的顺序至关重要。例如,在电商订单处理流程中,订单创建、支付、发货、确认收货等操作的消息必须按照顺序处理,否则可能会导致业务逻辑错误,如用户未支付就发货。RocketMQ 的顺序消息能够确保消息的顺序性,它分为全局顺序消息和分区顺序消息。

全局顺序消息指的是在整个消息队列中,所有消息严格按照发送顺序进行消费。分区顺序消息则是在消息队列的每个分区(Queue)内,消息按照发送顺序进行消费。在大多数实际场景中,分区顺序消息更为常用,因为它在保证局部顺序的同时,还能利用多分区的并行处理能力提高整体的吞吐量。

全局顺序消息实现原理

RocketMQ 实现全局顺序消息依赖于生产者将所有消息发送到同一个队列(Queue),消费者从该队列按顺序消费消息。在发送端,生产者通过选择固定的队列编号,保证所有消息都发往同一队列。例如:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(0); // 固定选择第一个队列
    }
}, null);
producer.shutdown();

在消费端,消费者按照顺序从该队列拉取并处理消息:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("ConsumeThread=" + Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

这种方式虽然保证了消息的全局顺序,但由于所有消息都集中在一个队列,吞吐量会受到很大限制,因为单个队列的处理能力是有限的。

分区顺序消息实现原理

分区顺序消息则是在每个分区内保证顺序。生产者通过自定义的 MessageQueueSelector,根据业务键(如订单 ID)将相同业务键的消息发送到同一个队列。例如:

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        long id = (long) arg;
        int index = (int) (id % mqs.size());
        return mqs.get(index); // 根据订单 ID 选择队列
    }
}, 100L); // 假设 100L 为订单 ID
producer.shutdown();

在消费端,消费者从每个队列按顺序消费消息。每个队列的消费是并行的,不同队列之间的消息顺序不做保证,但同一队列内的消息顺序是严格保证的。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("ConsumeThread=" + Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

这样既保证了同一业务键相关消息的顺序性,又通过多队列并行消费提高了整体的吞吐量。

RocketMQ事务消息

事务消息概念

事务消息用于解决分布式系统中消息发送与本地事务执行的一致性问题。例如,在电商系统中,用户下单后,需要扣减库存并发送消息通知物流系统发货。如果扣减库存成功但消息发送失败,可能导致库存已扣但物流未发货的情况;反之,如果消息发送成功但扣减库存失败,可能导致用户未下单却收到发货通知。RocketMQ 的事务消息能够确保这两个操作要么都成功,要么都失败。

事务消息实现原理

RocketMQ 的事务消息实现基于两阶段提交(2PC)思想。其流程如下:

  1. 发送半消息:生产者先发送一条半消息(Half Message)到 RocketMQ。半消息对消费者是不可见的。
  2. 执行本地事务:生产者在发送半消息成功后,执行本地事务。
  3. 提交或回滚事务:生产者根据本地事务的执行结果,向 RocketMQ 发送 Commit 或 Rollback 消息。如果发送 Commit 消息,RocketMQ 将半消息标记为可消费,消费者可以正常消费该消息;如果发送 Rollback 消息,RocketMQ 将删除半消息,消费者不会收到该消息。
  4. 事务状态回查:如果 RocketMQ 在一定时间内未收到生产者的 Commit 或 Rollback 消息,它会主动向生产者回查事务状态,生产者根据本地事务的实际执行情况返回 Commit 或 Rollback 状态。

事务消息代码示例

首先定义生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                System.out.println("执行本地事务: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE; // 模拟本地事务成功
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 事务状态回查
                System.out.println("事务状态回查: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        Message msg = new Message("transaction_topic", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println(sendResult);

        Thread.sleep(10 * 1000);
        producer.shutdown();
    }
}

然后定义消费者:

public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("transaction_topic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到事务消息: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动成功");
    }
}

在上述生产者代码中,executeLocalTransaction 方法模拟执行本地事务,checkLocalTransaction 方法用于事务状态回查。消费者代码则简单地接收并打印事务消息。

顺序消息与事务消息在实际项目中的应用场景结合

在一些复杂的业务场景中,可能既需要保证消息的顺序性,又需要确保消息发送与本地事务的一致性。例如,在金融交易系统中,一笔转账操作可能涉及多个步骤,如扣减转出账户余额、增加转入账户余额,并发送消息通知相关系统记录交易日志。这些操作不仅要保证顺序执行,而且要保证整个过程的原子性,即要么全部成功,要么全部失败。

此时,可以将顺序消息与事务消息结合使用。生产者在发送事务消息时,通过自定义的 MessageQueueSelector 将同一笔转账相关的消息发送到同一个队列,保证这些消息在该队列内的顺序性。同时,利用事务消息的机制确保整个转账操作与消息发送的一致性。

结合应用的代码示例

public class CombinedProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionMQProducer producer = new TransactionMQProducer("combined_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                System.out.println("执行本地事务: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE; // 模拟本地事务成功
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 事务状态回查
                System.out.println("事务状态回查: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        Message msg = new Message("combined_topic", "TagA", "TransferID001", "转账操作消息".getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                long id = (long) arg;
                int index = (int) (id % mqs.size());
                return mqs.get(index); // 根据转账 ID 选择队列
            }
        }, 100L); // 假设 100L 为转账 ID
        System.out.println(sendResult);

        Thread.sleep(10 * 1000);
        producer.shutdown();
    }
}
public class CombinedConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("combined_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("combined_topic", "TagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到结合应用消息: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("结合应用消费者启动成功");
    }
}

在上述代码中,生产者使用事务消息发送消息,并通过自定义的 MessageQueueSelector 将同一转账 ID 的消息发送到同一队列,保证顺序性。消费者则以顺序消息的方式消费这些消息,确保按照顺序处理转账相关的操作。

RocketMQ顺序消息与事务消息的性能优化

顺序消息性能优化

  1. 合理分区:根据业务量和消息分布情况,合理设置队列数量。如果队列数量过少,可能导致单个队列压力过大,影响顺序消息的处理性能;如果队列数量过多,可能会增加系统的管理开销。可以通过对历史数据的分析,预估每个队列可能承载的消息量,从而确定合适的队列数量。
  2. 批量消费:消费者端可以采用批量消费的方式,一次从队列中拉取多条消息进行处理。RocketMQ 支持设置每次拉取消息的最大数量,这样可以减少拉取消息的次数,提高消费效率。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setConsumeMessageBatchMaxSize(10); // 设置每次最大消费 10 条消息
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println("ConsumeThread=" + Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();
  1. 优化本地处理逻辑:在消费者处理消息的逻辑中,尽量减少复杂的计算和 I/O 操作。如果有一些耗时较长的操作,可以考虑将其异步化或者放到其他线程池中执行,以避免阻塞消息的消费。

事务消息性能优化

  1. 减少事务回查次数:在生产者执行本地事务时,尽量保证事务的执行速度和成功率,减少 RocketMQ 进行事务状态回查的次数。可以通过优化本地数据库操作、确保网络稳定等方式来提高本地事务的执行效率。
  2. 合理设置回查时间间隔:RocketMQ 的事务回查时间间隔可以通过配置进行调整。如果设置过短,可能会频繁回查,增加系统开销;如果设置过长,可能会导致事务长时间处于不确定状态。可以根据业务场景和系统性能情况,合理设置回查时间间隔。
  3. 批量事务消息:在一些场景下,如果多个本地事务之间相互独立且可以批量处理,可以考虑发送批量事务消息。这样可以减少发送半消息和提交/回滚事务的次数,提高整体性能。不过需要注意,批量事务消息中的所有消息必须都属于同一个事务,即要么全部成功,要么全部失败。

RocketMQ顺序消息与事务消息的异常处理

顺序消息异常处理

  1. 消费失败重试:当消费者处理顺序消息失败时,RocketMQ 会自动进行重试。默认情况下,顺序消息的重试次数为 16 次。在重试过程中,消息的消费顺序不会改变。如果多次重试后仍然失败,可以将该消息记录到死信队列(DLQ)中,以便后续进行人工处理。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                // 处理消息
                System.out.println("ConsumeThread=" + Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            // 消费失败,返回重试状态
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
});
consumer.start();
  1. 死信队列处理:进入死信队列的消息可以通过 RocketMQ 的管理工具或者自定义程序进行查询和处理。可以根据死信消息的具体情况,分析失败原因,进行人工修复或者重新发送等操作。

事务消息异常处理

  1. 事务回滚:如果生产者在执行本地事务时失败,或者在事务状态回查时发现本地事务执行失败,应及时向 RocketMQ 发送 Rollback 消息,将半消息删除,避免消费者收到不一致的消息。
  2. 异常日志记录:在生产者和消费者端,都应该详细记录事务消息处理过程中的异常信息。通过分析这些日志,可以快速定位问题,如网络故障、本地事务逻辑错误等,并采取相应的解决措施。
  3. 手动补偿:对于一些无法自动恢复的事务消息异常情况,如数据库死锁导致本地事务失败且无法重试,可以通过人工介入进行补偿操作。例如,手动更新数据库状态,重新发送事务消息等。

RocketMQ顺序消息与事务消息的监控与运维

顺序消息监控与运维

  1. 队列监控:通过 RocketMQ 的控制台或者监控工具,实时监控每个队列的消息堆积情况、消费进度等指标。如果发现某个队列消息堆积严重,可能是该队列的消费者处理速度过慢,需要及时排查原因,如增加消费者实例数量、优化消费逻辑等。
  2. 消费性能监控:监控消费者的处理时间、吞吐量等性能指标。可以通过在消费者代码中添加性能统计逻辑,将相关指标上报到监控系统。如果发现消费性能下降,及时分析是由于消息处理逻辑复杂还是系统资源不足导致的,并进行相应的优化。
  3. 故障恢复:当出现消费者故障或者网络故障导致顺序消息消费中断时,RocketMQ 会自动进行故障恢复。但在恢复过程中,需要确保消息的顺序不被打乱。可以通过设置合适的重试策略和恢复机制,保证系统的稳定性和消息顺序性。

事务消息监控与运维

  1. 事务状态监控:实时监控事务消息的状态,包括半消息的数量、已提交和已回滚的事务消息数量等。如果发现半消息数量持续增加且长时间未被处理,可能是生产者的事务状态回查出现问题,需要及时排查原因。
  2. 回查监控:监控事务状态回查的次数和成功率。如果回查次数过多或者成功率过低,说明本地事务执行可能存在不稳定因素,需要优化本地事务逻辑,确保事务的可靠性。
  3. 数据一致性检查:定期检查事务消息相关的数据一致性。例如,检查数据库中的事务记录与 RocketMQ 中的事务消息状态是否匹配,确保数据的准确性和一致性。对于不一致的数据,及时进行修复。

通过以上对 RocketMQ 顺序消息与事务消息的深入探讨、代码示例、性能优化、异常处理以及监控运维等方面的介绍,相信开发者能够更好地在实际项目中应用这两种消息类型,构建出可靠、高效的分布式系统。在实际应用过程中,还需要根据具体的业务场景和系统需求,灵活调整和优化相关配置与代码逻辑,以达到最佳的应用效果。