MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 消息堆积问题分析与解决

2021-11-185.1k 阅读

RocketMQ 消息堆积现象及影响

在后端开发中,当 RocketMQ 出现消息堆积时,从现象上看,消息在 Broker 端大量积压,无法及时被消费。这会导致后续生产的消息也持续滞留在队列中,使得队列长度不断增长。

消息堆积带来的影响十分严重。首先,对于业务实时性要求高的场景,如金融交易、实时监控等,消息不能及时处理,会使业务数据出现延迟,影响业务决策。例如在股票交易系统中,实时的交易信息如果不能及时处理,可能导致交易延迟,给投资者带来损失。其次,大量堆积的消息占用 Broker 的存储资源,若存储空间耗尽,可能导致 Broker 无法正常工作,甚至引发整个消息队列系统的瘫痪。而且,消息堆积还可能影响系统的扩展性,当业务量增长时,堆积问题会更加突出,阻碍系统的进一步发展。

消息堆积原因分析

  1. 消费者消费能力不足
    • 消费逻辑复杂:如果消费者在处理消息时,包含大量复杂的业务逻辑,如涉及多个数据库事务操作、复杂的计算等,会导致单个消息的处理时间过长。例如,在一个电商订单处理系统中,消费者在处理订单消息时,不仅要更新订单状态到数据库,还需要根据订单信息进行库存扣减、积分计算等一系列复杂操作。假设每个操作平均耗时 100ms,一次订单处理可能就需要几百毫秒甚至更长时间,这大大降低了消费速度。
    • 消费线程数设置不合理:RocketMQ 消费者可以通过设置线程数来控制消费并发度。如果线程数设置过少,在面对大量消息时,无法充分利用系统资源进行消费。例如,在一个高并发的日志采集系统中,每秒可能产生数千条日志消息,但如果消费者线程数仅设置为 10 个,每个线程处理一条消息平均需要 100ms,那么每秒最多处理 100 条消息,远远无法满足实际需求,从而导致消息堆积。
  2. 生产者生产速度过快
    • 业务突发流量:在某些特殊场景下,如电商大促活动、限时抢购等,业务流量会瞬间爆发,生产者会在短时间内产生大量消息。以“双 11”电商促销活动为例,零点刚过,大量用户同时下单,订单消息会像潮水般涌进 RocketMQ,远远超出了正常情况下消费者的处理能力,进而造成消息堆积。
    • 生产者发送策略不当:如果生产者采用过于激进的发送策略,如不考虑 Broker 的处理能力,持续高速发送消息,也容易导致消息堆积。例如,生产者没有设置合理的发送间隔,或者没有对发送结果进行有效处理,即使 Broker 已经出现负载过高的情况,仍然不断发送消息,最终使得 Broker 不堪重负,消息大量堆积。
  3. Broker 相关问题
    • 磁盘 I/O 瓶颈:RocketMQ 默认将消息持久化到磁盘。如果磁盘 I/O 性能较差,如使用了传统机械硬盘,在大量消息写入时,会出现 I/O 瓶颈。当 Broker 处理消息堆积时,既要接收新消息写入磁盘,又要从磁盘读取消息给消费者,磁盘 I/O 压力巨大。例如,在一个使用机械硬盘的老旧服务器部署的 Broker 环境中,写入速度可能只有每秒几十 MB,无法满足每秒数百 MB 的消息写入需求,从而导致消息堆积。
    • Broker 配置不合理:Broker 的一些关键配置参数,如内存缓冲区大小、线程池配置等,如果设置不合理,也会影响消息的处理能力。例如,Broker 的内存缓冲区设置过小,无法缓存大量消息,当消息写入速度大于从缓冲区刷盘到磁盘的速度时,就会导致消息堆积。另外,如果 Broker 的线程池线程数设置不合理,在处理消息请求时,无法及时分配线程进行处理,也会造成消息处理延迟,进而引发消息堆积。

消息堆积问题解决策略

  1. 提升消费者消费能力
    • 优化消费逻辑
      • 减少不必要的操作:仔细梳理消费逻辑,去除那些对业务结果没有实质影响或者可以异步处理的操作。例如,在上述电商订单处理系统中,如果积分计算不是实时性要求极高的操作,可以将其从订单处理的同步流程中分离出来,采用异步方式处理,如使用另一个消息队列专门处理积分计算任务,这样订单消息的处理速度就能大幅提升。
      • 优化数据库操作:对数据库操作进行优化,如合理使用索引、批量操作等。在订单状态更新和库存扣减操作中,如果数据库表设计合理且使用了合适的索引,单个数据库操作的时间可以显著缩短。同时,将多个相关的数据库操作合并为批量操作,减少数据库交互次数。例如,原本需要对 10 条库存记录进行扣减,每次操作一条记录,共需 10 次数据库交互。如果采用批量操作,一次交互就可以完成 10 条记录的扣减,大大提高了操作效率。
    • 合理调整消费线程数:根据业务场景和服务器资源情况,合理调整消费者的线程数。可以通过性能测试来确定最佳线程数。例如,在一个消息处理任务中,首先设置较低的线程数,如 10 个线程,观察系统的消费吞吐量和资源利用率。然后逐步增加线程数,每次增加 10 个线程,记录不同线程数下的消费吞吐量和系统资源(如 CPU、内存、网络带宽)的使用情况。通过分析这些数据,找到消费吞吐量达到峰值且系统资源利用合理的线程数。假设在测试过程中,发现当线程数为 50 时,消费吞吐量最高,且系统资源没有出现过度消耗的情况,那么就可以将消费者线程数设置为 50。
  2. 控制生产者生产速度
    • 流量削峰
      • 使用令牌桶算法:生产者可以采用令牌桶算法来控制消息发送速度。令牌桶算法的原理是系统以固定速率生成令牌放入桶中,当生产者要发送消息时,需要从桶中获取一个令牌,如果桶中没有令牌,则等待令牌生成或者丢弃消息。在 Java 中,可以使用 Guava 库来实现令牌桶算法。以下是一个简单的代码示例:
import com.google.common.util.concurrent.RateLimiter;

public class ProducerWithTokenBucket {
    private RateLimiter rateLimiter;

    public ProducerWithTokenBucket(double permitsPerSecond) {
        rateLimiter = RateLimiter.create(permitsPerSecond);
    }

    public void sendMessage(String message) {
        rateLimiter.acquire();// 获取令牌
        // 实际的消息发送逻辑,这里简单打印
        System.out.println("Sending message: " + message);
    }
}

在上述代码中,ProducerWithTokenBucket 类使用 RateLimiter 来控制消息发送速度。permitsPerSecond 参数表示每秒生成的令牌数,即消息发送的速率。通过调用 rateLimiter.acquire() 方法获取令牌,只有获取到令牌后才进行消息发送。 - 采用消息队列进行缓冲:在生产者和 RocketMQ 之间再引入一个本地消息队列作为缓冲。当业务流量突发时,生产者先将消息发送到本地消息队列,然后由本地消息队列以稳定的速度将消息发送到 RocketMQ。这样可以避免生产者直接向 RocketMQ 发送大量消息导致堆积。例如,可以使用 Disruptor 这种高性能的本地消息队列框架。Disruptor 采用无锁环形队列的方式,能够提供极高的消息处理性能。

  • 优化生产者发送策略
    • 设置合理的发送间隔:生产者在发送消息时,设置合适的发送间隔,避免连续高速发送消息。例如,可以根据 RocketMQ 的性能指标和当前系统负载情况,动态调整发送间隔。如果 Broker 的负载较低,可以适当缩短发送间隔;如果 Broker 负载较高,则增加发送间隔。可以通过定期获取 Broker 的负载信息(如 CPU 使用率、内存使用率、消息堆积量等)来进行动态调整。
    • 处理发送结果:生产者在发送消息后,要对发送结果进行及时处理。如果发送失败,根据失败原因进行相应处理,如重试发送、记录日志等。在 RocketMQ 中,发送消息的代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMQProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes("UTF-8") /* Message body */
            );
            SendResult sendResult = producer.send(msg);
            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                // 处理发送失败情况,如重试
                System.out.println("Send message failed, retry...");
                // 这里简单示例,实际可根据业务需求进行更复杂的重试逻辑
                sendResult = producer.send(msg);
            }
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

在上述代码中,发送消息后通过判断 sendResult.getSendStatus() 是否为 SEND_OK 来确定发送是否成功。如果发送失败,可以根据业务需求进行重试等处理。 3. 优化 Broker 性能

  • 提升磁盘 I/O 性能
    • 更换存储设备:将传统机械硬盘更换为固态硬盘(SSD)。SSD 具有读写速度快、随机 I/O 性能好的特点,可以显著提升消息的持久化和读取速度。例如,使用三星 980 PRO 等高性能 SSD,其顺序读取速度可达 7000MB/s 以上,顺序写入速度可达 5100MB/s 以上,相比传统机械硬盘的几十 MB/s 的读写速度,能极大地改善 Broker 的 I/O 性能。
    • 优化磁盘 I/O 配置:合理调整磁盘的 I/O 调度算法。在 Linux 系统中,可以根据服务器的负载情况选择合适的调度算法,如 deadline、cfq 等。对于 RocketMQ 这种对 I/O 性能要求较高的应用场景,deadline 调度算法通常能提供较好的性能。可以通过修改 /sys/block/sda/queue/scheduler 文件来切换调度算法(假设磁盘设备为 /dev/sda)。
  • 优化 Broker 配置
    • 调整内存缓冲区大小:根据服务器的内存资源和业务消息量,合理调整 Broker 的内存缓冲区大小。在 RocketMQ 的配置文件 broker.conf 中,可以通过 mapedFileSizeCommitLog 参数来调整 CommitLog 文件的大小,该文件用于存储消息,其大小会影响内存缓冲区的使用。一般来说,如果服务器内存充足且消息量较大,可以适当增大该值,如将其设置为 1073741824(即 1GB)。
    • 优化线程池配置:调整 Broker 的线程池参数,如线程池大小、队列容量等。在 RocketMQ 的源码中,BrokerController 类中涉及到线程池的配置。例如,可以根据服务器的 CPU 核心数和业务负载情况,合理设置处理消息请求的线程池大小。如果服务器有 8 个 CPU 核心,且业务负载较高,可以将线程池大小设置为 16 或更多,以充分利用 CPU 资源,提高消息处理能力。

消息堆积监控与预警

  1. 监控指标
    • 消息堆积量:通过 RocketMQ 的管理控制台或者 API 可以获取每个 Topic 的消息堆积数量。例如,在 RocketMQ 管理控制台的 Topic 详情页面,可以直观地看到当前 Topic 的消息堆积量。也可以通过 RocketMQ 的 OpenAPI 来获取,以下是一个简单的 Java 代码示例,通过 RocketMQ 的 OpenAPI 获取 Topic 的消息堆积量:
import org.apache.rocketmq.client.admin.DefaultMQAdminExt;
import org.apache.rocketmq.common.admin.TopicStatsTable;

public class RocketMQMonitor {
    public static void main(String[] args) throws Exception {
        DefaultMQAdminExt admin = new DefaultMQAdminExt();
        admin.setNamesrvAddr("localhost:9876");
        admin.start();
        TopicStatsTable topicStatsTable = admin.examineTopicStats("TopicTest");
        long queueSize = topicStatsTable.getQueueTotalSize();
        System.out.println("TopicTest message queue size: " + queueSize);
        admin.shutdown();
    }
}

在上述代码中,通过 DefaultMQAdminExtexamineTopicStats 方法获取 Topic 的统计信息,其中 getQueueTotalSize 方法返回消息队列的总大小,即消息堆积量。

  • 消费延迟:计算消息从生产到消费的时间差。可以在生产者发送消息时,在消息体中添加发送时间戳,消费者在处理消息时获取当前时间,计算两者的时间差。例如,在生产者发送消息时:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerWithTimestamp {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        long sendTime = System.currentTimeMillis();
        Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + sendTime).getBytes("UTF-8") /* Message body */
        );
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }
}

在消费者处理消息时:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithTimestamp {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                          ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String messageBody = new String(msg.getBody());
                    long sendTime = Long.parseLong(messageBody.split(" ")[1]);
                    long consumeTime = System.currentTimeMillis();
                    long delay = consumeTime - sendTime;
                    System.out.println("Message delay: " + delay + " ms");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer started.");
    }
}

通过这种方式可以计算出每条消息的消费延迟。

  • Broker 资源利用率:监控 Broker 服务器的 CPU 使用率、内存使用率、磁盘 I/O 使用率和网络带宽使用率等。在 Linux 系统中,可以使用 top 命令查看 CPU 和内存使用率,使用 iostat 命令查看磁盘 I/O 使用率,使用 ifstat 命令查看网络带宽使用率。也可以通过一些监控工具,如 Prometheus + Grafana 来实现对这些指标的实时监控和可视化展示。
  1. 预警机制
    • 设置阈值:根据业务需求和系统性能指标,为监控指标设置合理的阈值。例如,将消息堆积量的阈值设置为 10000 条,如果某个 Topic 的消息堆积量超过这个阈值,就触发预警。对于消费延迟,可以设置阈值为 500ms,如果消息的平均消费延迟超过这个值,也触发预警。对于 Broker 的资源利用率,如 CPU 使用率阈值设置为 80%,内存使用率阈值设置为 90%,当达到这些阈值时,发出预警。
    • 预警方式:可以采用多种预警方式,如邮件预警、短信预警、即时通讯工具(如钉钉、微信)预警等。以邮件预警为例,可以使用 Java 的 JavaMail API 来实现。以下是一个简单的邮件发送示例:
import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.util.Properties;

public class EmailAlert {
    public static void sendAlert(String subject, String content) {
        String from = "sender@example.com";
        String password = "password";
        String to = "receiver@example.com";

        Properties props = new Properties();
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.starttls.enable", "true");
        props.put("mail.smtp.host", "smtp.example.com");
        props.put("mail.smtp.port", "587");

        Session session = Session.getInstance(props,
                new javax.mail.Authenticator() {
                    protected PasswordAuthentication getPasswordAuthentication() {
                        return new PasswordAuthentication(from, password);
                    }
                });

        try {
            Message message = new MimeMessage(session);
            message.setFrom(new InternetAddress(from));
            message.setRecipients(Message.RecipientType.TO,
                    InternetAddress.parse(to));
            message.setSubject(subject);
            message.setText(content);

            Transport.send(message);

            System.out.println("Email sent successfully.");
        } catch (MessagingException e) {
            throw new RuntimeException(e);
        }
    }
}

在实际应用中,可以在监控程序检测到指标超过阈值时,调用 sendAlert 方法发送预警邮件。通过及时的预警,运维人员可以快速发现 RocketMQ 消息堆积问题,并采取相应的解决措施。

案例分析

  1. 案例背景 某电商平台在一次促销活动中,使用 RocketMQ 作为消息队列来处理订单消息、库存消息等。活动期间,突然出现消息堆积问题,导致订单处理延迟,用户反馈下单后长时间未看到订单状态更新。
  2. 问题排查
    • 消费者方面:通过查看消费者的日志,发现消费者在处理订单消息时,由于业务逻辑中包含复杂的库存校验和积分计算,且数据库操作未进行优化,导致单个订单消息处理时间长达 500ms 以上。同时,消费者线程数设置为 20,在高并发消息处理时,明显不足。
    • 生产者方面:活动期间,业务流量瞬间增长数倍,生产者没有采取任何流量控制措施,持续高速发送消息到 RocketMQ,远远超过了消费者的处理能力。
    • Broker 方面:Broker 服务器使用的是传统机械硬盘,在大量消息写入时,出现磁盘 I/O 瓶颈,消息持久化速度缓慢,进一步加剧了消息堆积。
  3. 解决措施
    • 优化消费者:对消费逻辑进行优化,将积分计算从订单处理的同步流程中分离出来,采用异步处理。同时,优化数据库操作,对库存校验相关的数据库表添加索引,并将多个库存操作合并为批量操作。此外,根据服务器资源情况,将消费者线程数增加到 50。
    • 控制生产者:在生产者端引入令牌桶算法进行流量控制,设置每秒发送消息的速率为 1000 条。同时,对发送结果进行处理,若发送失败,进行三次重试。
    • 优化 Broker:将 Broker 服务器的磁盘更换为固态硬盘,提升磁盘 I/O 性能。并且调整 Broker 的内存缓冲区大小,将 mapedFileSizeCommitLog 增大到 1GB,优化线程池配置,增加处理消息请求的线程数。
  4. 效果验证 经过上述优化后,消息堆积问题得到有效解决。在后续的小规模压力测试和实际业务运行中,消息能够及时被消费,订单处理延迟从原来的数秒缩短到了 100ms 以内,系统性能得到显著提升,用户体验也得到了极大改善。

通过对这个实际案例的分析,可以看到 RocketMQ 消息堆积问题的解决需要从消费者、生产者和 Broker 多个方面入手,综合运用各种优化策略,才能确保消息队列系统的稳定高效运行。在实际的后端开发中,应根据具体业务场景和系统特点,灵活运用这些方法,以保障业务的正常进行。