MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ监控与告警系统搭建

2023-12-145.6k 阅读

1. RocketMQ 基础概述

RocketMQ 是一款分布式、队列模型的消息中间件,由阿里巴巴开源,具有低延迟、高并发、高可用、海量消息堆积能力等特性。它主要由 NameServer、Broker、Producer、Consumer 等组件构成。

  • NameServer:是一个轻量级的注册中心,主要用来保存 Broker 的路由信息,Producer 和 Consumer 通过 NameServer 来发现 Broker 的地址。
  • Broker:负责消息的存储、投递和查询等操作,它接收 Producer 发送过来的消息,同时为 Consumer 提供拉取消息的服务。
  • Producer:消息的生产者,负责向 Broker 发送消息。
  • Consumer:消息的消费者,从 Broker 拉取消息并进行处理。

2. 监控的重要性

在 RocketMQ 的实际应用中,监控系统起着至关重要的作用。随着业务规模的增长,消息队列的流量、性能等指标会发生变化,如果不能及时监控到这些变化,可能会导致消息积压、服务不可用等问题。通过监控,我们可以实时了解 RocketMQ 的运行状态,提前发现潜在的风险,如 Broker 的 CPU、内存使用率过高,消息的发送和消费延迟等。同时,结合告警系统,当某些关键指标超出阈值时,能够及时通知相关人员进行处理,保障系统的稳定运行。

3. 监控指标的确定

3.1 系统指标

  • CPU 使用率:反映 Broker 服务器 CPU 的繁忙程度。过高的 CPU 使用率可能导致消息处理速度变慢,影响整体性能。
  • 内存使用率:Broker 在处理消息过程中需要占用一定的内存,监控内存使用率可以防止内存溢出等问题。
  • 磁盘使用率:消息存储在磁盘上,磁盘空间不足可能导致消息无法正常存储,因此需要监控磁盘使用率。

3.2 消息指标

  • 消息发送成功率:衡量 Producer 发送消息到 Broker 的成功比例。成功率低可能意味着网络问题、Broker 负载过高或配置错误等。
  • 消息发送 TPS(Transactions Per Second):即每秒发送的消息数量,反映了消息的生产速度。
  • 消息消费成功率:表示 Consumer 从 Broker 拉取消息并成功处理的比例。消费成功率低可能是消费逻辑出现异常。
  • 消息消费 TPS:每秒消费的消息数量,体现了消息的消费速度。
  • 消息积压数量:如果消费速度低于生产速度,就会导致消息积压。监控消息积压数量可以及时发现潜在的消费能力不足问题。

4. 监控系统搭建

4.1 使用 Prometheus 采集指标

Prometheus 是一款开源的监控系统,具有强大的数据采集和查询功能。要使用 Prometheus 监控 RocketMQ,我们需要借助 RocketMQ Exporter。

  • 安装 RocketMQ Exporter: 首先,从官方仓库下载 RocketMQ Exporter 的二进制文件。假设下载的文件名为 rocketmq_exporter-0.3.0.linux-amd64.tar.gz,解压该文件:
tar -zxvf rocketmq_exporter-0.3.0.linux-amd64.tar.gz
cd rocketmq_exporter-0.3.0.linux-amd64
  • 配置 RocketMQ Exporter: 编辑 config.yml 文件,配置 NameServer 的地址:
rocketmq:
  nameSrvAddr: 192.168.1.100:9876;192.168.1.101:9876

这里假设 NameServer 的地址为 192.168.1.100:9876192.168.1.101:9876

  • 启动 RocketMQ Exporter: 在解压后的目录下执行以下命令启动 Exporter:
nohup./rocketmq_exporter --config.file=config.yml &

RocketMQ Exporter 启动后,会定期从 RocketMQ 的 NameServer 获取各个 Broker 的指标信息,并暴露在指定端口(默认为 9311)上,供 Prometheus 采集。

4.2 Prometheus 配置

编辑 Prometheus 的配置文件 prometheus.yml,添加 RocketMQ Exporter 的采集任务:

scrape_configs:
  - job_name: 'rocketmq'
    static_configs:
      - targets: ['192.168.1.102:9311']
    metrics_path: /metrics
    params:
      module: [http_2xx]
    relabel_configs:
      - source_labels: [__address__]
        target_label: __param_target
      - source_labels: [__param_target]
        target_label: instance
      - target_label: __address__
        replacement: 192.168.1.102:9311

这里假设 RocketMQ Exporter 运行在 192.168.1.102 这台机器上,端口为 9311。修改完成后,重启 Prometheus 使配置生效。

4.3 Grafana 可视化

Grafana 是一款功能强大的可视化工具,可以将 Prometheus 采集到的数据以图表的形式展示出来。

  • 安装 Grafana: 根据操作系统类型,从 Grafana 官方网站下载对应的安装包并安装。例如,在 Ubuntu 系统上,可以使用以下命令安装:
sudo apt-get install -y apt-transport-https
sudo apt-get install -y software-properties-common wget
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -
sudo add-apt-repository "deb https://packages.grafana.com/oss/deb stable main"
sudo apt-get update
sudo apt-get install grafana
sudo systemctl start grafana-server
sudo systemctl enable grafana-server
  • 配置 Grafana: 登录 Grafana(默认地址为 http://localhost:3000,默认用户名和密码为 admin),添加 Prometheus 数据源。在 Grafana 界面中,依次点击 Configuration -> Data Sources,然后点击 Add data source,选择 Prometheus,在 URL 字段中填写 Prometheus 的地址(例如 http://192.168.1.103:9090,假设 Prometheus 运行在 192.168.1.103 上,端口为 9090),保存配置。
  • 导入 RocketMQ 监控面板: Grafana 官方提供了一些 RocketMQ 的监控面板模板。可以在 Grafana 界面中,依次点击 Dashboards -> Import,然后输入模板 ID(如 11176),选择之前配置的 Prometheus 数据源,点击 Import 即可导入监控面板。导入成功后,就可以在 Grafana 中查看 RocketMQ 的各项监控指标图表。

5. 告警系统搭建

5.1 使用 Alertmanager 进行告警

Alertmanager 是 Prometheus 生态系统中的告警组件,负责接收 Prometheus 发送的告警信息,并根据配置进行分组、抑制和发送通知。

  • 安装 Alertmanager: 从 Alertmanager 官方网站下载对应操作系统的二进制文件,解压后得到可执行文件 alertmanager
  • 配置 Alertmanager: 编辑 Alertmanager 的配置文件 alertmanager.yml,配置通知方式,例如使用邮件通知:
global:
  smtp_smarthost: 'your_smtp_server:587'
  smtp_from: 'your_email@example.com'
  smtp_auth_username: 'your_email@example.com'
  smtp_auth_password: 'your_password'
  smtp_require_tls: true

route:
  group_by: ['alertname']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 12h
  receiver: 'email'

receivers:
 - name: 'email'
   email_configs:
   - to: 'admin@example.com'
     subject: '[RocketMQ Alert] {{.CommonLabels.alertname }}'
     html: |-
       <html>
       <body>
       <h4>告警信息</h4>
       <p>告警名称: {{.CommonLabels.alertname }}</p>
       <p>告警详情: {{.CommonAnnotations.message }}</p>
       </body>
       </html>

这里配置了使用 SMTP 服务器发送邮件通知,将告警信息发送到 admin@example.com

  • 启动 Alertmanager: 在解压后的目录下执行以下命令启动 Alertmanager:
nohup./alertmanager --config.file=alertmanager.yml &

5.2 Prometheus 告警规则配置

在 Prometheus 的配置目录下创建一个告警规则文件,例如 rocketmq_alerts.yml,编写告警规则。以下是一些示例规则:

  • 消息发送成功率过低告警
groups:
 - name: rocketmq_alerts
   rules:
   - alert: RocketMQSendSuccessRateLow
     expr: rocketmq_producer_send_success_rate < 0.9
     for: 5m
     labels:
       severity: critical
     annotations:
       summary: 'RocketMQ 消息发送成功率过低'
       message: '消息发送成功率仅为 {{ $value }},低于阈值 0.9,可能存在网络或配置问题'
  • 消息积压数量过高告警
   - alert: RocketMQMessageBacklogHigh
     expr: rocketmq_topic_queue_backlog > 1000
     for: 10m
     labels:
       severity: critical
     annotations:
       summary: 'RocketMQ 消息积压数量过高'
       message: '消息积压数量达到 {{ $value }},超过阈值 1000,可能消费能力不足'

在 Prometheus 的配置文件 prometheus.yml 中添加告警规则文件的引用:

rule_files:
  - 'rocketmq_alerts.yml'

重启 Prometheus 使告警规则生效。当 Prometheus 检测到指标满足告警规则条件时,会将告警信息发送给 Alertmanager,Alertmanager 再根据配置将告警通知发送给相关人员。

6. 代码示例

6.1 RocketMQ Producer 示例(Java)

以下是一个简单的 RocketMQ Producer 代码示例,用于发送消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建 DefaultMQProducer 实例
        DefaultMQProducer producer = new DefaultMQProducer("example_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
        // 启动 Producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例
            Message msg = new Message("example_topic",
                    "TagA",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭 Producer
        producer.shutdown();
    }
}

在这个示例中,首先创建了一个 DefaultMQProducer 实例,并设置了 NameServer 地址。然后启动 Producer,循环发送 10 条消息到指定的主题 example_topic。最后关闭 Producer。

6.2 RocketMQ Consumer 示例(Java)

以下是一个简单的 RocketMQ Consumer 代码示例,用于消费消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建 DefaultMQPushConsumer 实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
        // 设置消费策略为从最新消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅主题和标签
        consumer.subscribe("example_topic", "TagA");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        System.out.println("收到消息: " + new String(msg.getBody()));
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动 Consumer
        consumer.start();
        System.out.println("Consumer 启动成功");
    }
}

在这个示例中,创建了一个 DefaultMQPushConsumer 实例,并设置了 NameServer 地址和消费策略。然后订阅了 example_topic 主题下的 TagA 标签。接着注册了一个消息监听器,在监听器中处理接收到的消息。最后启动 Consumer。

通过以上步骤,我们完成了 RocketMQ 监控与告警系统的搭建,并给出了相关的代码示例,希望能帮助你更好地管理和维护 RocketMQ 消息队列在后端开发中的应用。在实际应用中,你可以根据具体业务需求对监控指标、告警规则等进行进一步的优化和定制。