RocketMQ 消息队列的监控指标体系
RocketMQ 基础概念回顾
在深入探讨 RocketMQ 的监控指标体系之前,让我们先简要回顾一下 RocketMQ 的核心概念。RocketMQ 是一款分布式消息中间件,主要由 NameServer、Broker、Producer 和 Consumer 组成。
- NameServer:作为轻量级的元数据服务器,提供了 Broker 的注册、发现等功能。NameServer 之间相互独立,不进行数据同步,Broker 会定期向所有 NameServer 发送心跳包,上报自身的状态。
- Broker:负责消息的存储、转发等核心功能。它接收生产者发送的消息,存储到本地文件系统中,并为消费者提供拉取消息的服务。Broker 分为 Master 和 Slave 两种角色,Master 负责处理读写请求,Slave 则用于数据备份,提高系统的可用性。
- Producer:消息的生产者,负责将业务系统中的消息发送到 RocketMQ 集群。Producer 可以分为多种类型,如 DefaultMQProducer、TransactionMQProducer 等,以满足不同的业务需求。
- Consumer:消息的消费者,从 Broker 拉取消息并进行处理。Consumer 支持集群消费和广播消费两种模式,集群消费模式下,同一消息只会被集群内的一个消费者实例消费;广播消费模式下,同一消息会被集群内的所有消费者实例消费。
监控指标体系的重要性
在分布式系统中,监控指标体系是保障系统稳定运行、快速定位问题的关键手段。对于 RocketMQ 消息队列而言,一个完善的监控指标体系具有以下重要意义:
- 性能评估:通过监控消息的发送和消费性能指标,如发送延迟、消费速率等,可以评估 RocketMQ 集群是否满足业务的性能需求,以便及时调整集群配置或优化业务逻辑。
- 故障预警:实时监控关键指标,如 Broker 的磁盘使用率、内存使用率等,当指标超出正常范围时及时发出预警,使运维人员能够在故障发生前采取措施,避免系统故障对业务造成影响。
- 问题定位:当系统出现异常时,详细的监控指标数据可以帮助开发和运维人员快速定位问题的根源,例如判断是生产者发送消息异常、消费者消费消息异常还是 Broker 自身出现故障等。
关键监控指标分类
RocketMQ 的监控指标可以分为以下几类:
- 消息发送指标
- 发送成功率:指生产者成功发送到 Broker 的消息数量与总发送消息数量的比值。计算公式为:
发送成功率 = 成功发送消息数 / 总发送消息数
。发送成功率是衡量生产者与 Broker 之间连接稳定性以及消息发送逻辑正确性的重要指标。如果发送成功率较低,可能是网络问题、Broker 负载过高或者生产者配置错误等原因导致。 - 发送延迟:即从生产者调用发送方法到收到 Broker 响应的时间间隔。发送延迟反映了消息从生产者到 Broker 的传输效率。过高的发送延迟可能会影响业务的实时性,例如在实时交易系统中,消息发送延迟过高可能导致交易数据的处理不及时。
- TPS(Transactions Per Second):每秒发送的消息数量。TPS 可以直观地反映生产者的消息生产能力,通过监控 TPS 可以评估业务的消息产生量是否在 RocketMQ 集群的处理能力范围内。
- 发送成功率:指生产者成功发送到 Broker 的消息数量与总发送消息数量的比值。计算公式为:
- 消息消费指标
- 消费成功率:指消费者成功处理的消息数量与从 Broker 拉取的消息数量的比值。计算公式为:
消费成功率 = 成功消费消息数 / 拉取消息数
。消费成功率是衡量消费者处理消息逻辑正确性的关键指标,如果消费成功率较低,可能是消费者代码存在 bug、依赖的外部系统异常或者消息格式不正确等原因导致。 - 消费速率:即单位时间内消费者处理的消息数量。消费速率反映了消费者的消息处理能力,与业务的处理逻辑复杂度、消费者的资源配置等因素有关。如果消费速率过低,可能导致消息在 Broker 端积压,影响业务的正常运行。
- 消息积压量:指 Broker 中尚未被消费者拉取和处理的消息数量。消息积压量是一个非常关键的指标,它直接反映了消息生产和消费之间的平衡关系。当消息积压量持续增长时,说明消费速度跟不上生产速度,需要及时排查原因并采取措施,如增加消费者实例、优化消费逻辑等。
- 消费成功率:指消费者成功处理的消息数量与从 Broker 拉取的消息数量的比值。计算公式为:
- Broker 指标
- 磁盘使用率:Broker 需要将消息持久化到磁盘,因此磁盘使用率是一个重要的指标。过高的磁盘使用率可能导致消息写入速度变慢,甚至出现磁盘空间不足的情况,影响 Broker 的正常运行。
- 内存使用率:Broker 在处理消息的过程中需要使用内存来缓存一些数据,如消息索引等。内存使用率过高可能导致 Broker 性能下降,甚至出现 OOM(Out Of Memory)错误。
- 网络流量:包括 Broker 接收和发送的网络流量。网络流量过大可能导致网络拥塞,影响消息的传输效率。监控网络流量可以帮助运维人员评估网络带宽是否满足业务需求,及时进行网络扩容。
- CPU 使用率:Broker 在处理消息的各种操作,如消息存储、转发、索引构建等过程中都会消耗 CPU 资源。CPU 使用率过高可能表示 Broker 的负载过重,需要进一步分析是哪些操作导致 CPU 消耗过大,并进行优化。
- NameServer 指标
- 注册的 Broker 数量:NameServer 负责管理 Broker 的注册信息,注册的 Broker 数量反映了当前 RocketMQ 集群的规模。如果注册的 Broker 数量突然减少,可能表示部分 Broker 出现故障,需要及时排查。
- 请求响应时间:指 NameServer 处理客户端请求(如 Broker 注册、Producer/Consumer 发现 Broker 等请求)的响应时间。请求响应时间过长可能影响 Producer 和 Consumer 与 Broker 的正常通信,进而影响整个系统的性能。
监控指标的获取方式
- RocketMQ 自带的监控工具:RocketMQ 提供了一些内置的监控方式,例如通过 Broker 的 Web 控制台可以查看部分基本指标,如消息堆积情况、Broker 的运行状态等。此外,RocketMQ 还提供了一些命令行工具,如
mqadmin
,可以获取更详细的指标信息。例如,通过mqadmin clusterList
命令可以查看集群的整体信息,包括各个 Broker 的地址、角色等;通过mqadmin topicList
命令可以查看所有主题的信息,包括每个主题的队列数量、读写权限等。 - JMX(Java Management Extensions):由于 RocketMQ 是基于 Java 开发的,因此可以通过 JMX 来获取其内部的运行指标。RocketMQ 暴露了一系列的 MBean(Managed Bean),通过 JMX 客户端可以连接到 RocketMQ 的进程,获取详细的指标数据。例如,可以通过 JConsole 等工具连接到 Broker 进程,查看 Broker 的内存使用情况、线程池状态等指标。以下是一段简单的代码示例,用于通过 JMX 获取 RocketMQ Broker 的一些指标:
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;
public class RocketMQJMXExample {
public static void main(String[] args) {
try {
// 设置 JMX 连接地址
String jmxUrl = "service:jmx:rmi:///jndi/rmi://localhost:9876/jmxrmi";
JMXServiceURL url = new JMXServiceURL(jmxUrl);
Map<String, Object> env = new HashMap<>();
env.put(JMXConnector.CREDENTIALS, new String[]{"admin", "admin"});
JMXConnector jmxc = JMXConnectorFactory.connect(url, env);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
// 获取 Broker 的 MBean 对象名
ObjectName objectName = new ObjectName("org.apache.rocketmq:type=BrokerController");
// 获取 Broker 的总消息数指标
Long totalMessageCount = (Long) mbsc.getAttribute(objectName, "TotalMessageCount");
System.out.println("Total Message Count: " + totalMessageCount);
// 关闭 JMX 连接
jmxc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 集成第三方监控系统:为了实现更全面、灵活的监控,通常会将 RocketMQ 与第三方监控系统集成,如 Prometheus + Grafana。Prometheus 是一款开源的监控系统,具有强大的数据采集和存储能力,而 Grafana 则是一款数据可视化工具,可以将 Prometheus 采集到的数据以图表的形式直观地展示出来。要将 RocketMQ 与 Prometheus 集成,需要使用 RocketMQ Exporter。RocketMQ Exporter 是一个将 RocketMQ 的指标转换为 Prometheus 可识别格式的工具。以下是使用 Docker 部署 RocketMQ Exporter 的简单步骤:
- 拉取 RocketMQ Exporter 镜像:
docker pull styletang/rocketmq-exporter:v0.4.0
- 运行容器:
docker run -d -p 9311:9311 -e "NAMESRV_ADDR=rocketmq-namesrv1:9876;rocketmq-namesrv2:9876" styletang/rocketmq-exporter:v0.4.0
- 配置 Prometheus:在 Prometheus 的配置文件
prometheus.yml
中添加以下内容:
- 拉取 RocketMQ Exporter 镜像:
scrape_configs:
- job_name: 'rocketmq'
static_configs:
- targets: ['localhost:9311']
- 启动 Prometheus:`prometheus --config.file=prometheus.yml`
- 配置 Grafana:在 Grafana 中添加 Prometheus 数据源,并导入 RocketMQ 相关的 Dashboard 模板,即可在 Grafana 中查看 RocketMQ 的监控图表。
消息发送指标监控实现
- 发送成功率监控:在生产者代码中,可以通过记录每次发送消息的结果来计算发送成功率。以下是一个简单的示例,使用
DefaultMQProducer
发送消息并统计发送成功率:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerSuccessRateExample {
private static int totalSendCount = 0;
private static int successSendCount = 0;
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
totalSendCount++;
if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {
successSendCount++;
}
}
double successRate = (double) successSendCount / totalSendCount;
System.out.println("Send Success Rate: " + successRate);
producer.shutdown();
}
}
- 发送延迟监控:可以通过记录消息发送的开始时间和收到响应的时间来计算发送延迟。以下是在上述示例基础上添加发送延迟监控的代码:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerLatencyExample {
private static long totalLatency = 0;
private static int sendCount = 0;
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes("UTF-8"));
long startTime = System.currentTimeMillis();
SendResult sendResult = producer.send(message);
long endTime = System.currentTimeMillis();
long latency = endTime - startTime;
totalLatency += latency;
sendCount++;
}
double averageLatency = (double) totalLatency / sendCount;
System.out.println("Average Send Latency: " + averageLatency + " ms");
producer.shutdown();
}
}
- TPS 监控:通过记录单位时间内发送的消息数量来计算 TPS。以下是在上述示例基础上添加 TPS 监控的代码:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ProducerTPSExample {
private static int sendCount = 0;
private static long startTime = System.currentTimeMillis();
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message message = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes("UTF-8"));
SendResult sendResult = producer.send(message);
sendCount++;
}
long endTime = System.currentTimeMillis();
double tps = (double) sendCount / ((endTime - startTime) / 1000);
System.out.println("TPS: " + tps + " messages per second");
producer.shutdown();
}
}
消息消费指标监控实现
- 消费成功率监控:在消费者代码中,可以通过记录每次消费消息的结果来计算消费成功率。以下是一个简单的示例,使用
DefaultMQPushConsumer
消费消息并统计消费成功率:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerSuccessRateExample {
private static int totalConsumeCount = 0;
private static int successConsumeCount = 0;
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
totalConsumeCount++;
for (MessageExt msg : msgs) {
try {
// 模拟业务处理
System.out.println(new String(msg.getBody(), "UTF-8"));
successConsumeCount++;
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
// 等待一段时间,让消费者消费一些消息
Thread.sleep(5000);
double successRate = (double) successConsumeCount / totalConsumeCount;
System.out.println("Consume Success Rate: " + successRate);
consumer.shutdown();
}
}
- 消费速率监控:可以通过记录单位时间内消费的消息数量来计算消费速率。以下是在上述示例基础上添加消费速率监控的代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerRateExample {
private static int consumeCount = 0;
private static long startTime = System.currentTimeMillis();
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
consumeCount += msgs.size();
for (MessageExt msg : msgs) {
try {
// 模拟业务处理
System.out.println(new String(msg.getBody(), "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
// 等待一段时间,让消费者消费一些消息
Thread.sleep(5000);
long endTime = System.currentTimeMillis();
double consumeRate = (double) consumeCount / ((endTime - startTime) / 1000);
System.out.println("Consume Rate: " + consumeRate + " messages per second");
consumer.shutdown();
}
}
- 消息积压量监控:可以通过
mqadmin
命令或者集成第三方监控系统来获取消息积压量。使用mqadmin
命令获取消息积压量的示例如下:
mqadmin queryMsgById -n localhost:9876 -i <messageId>
通过上述命令可以获取指定消息的详细信息,包括消息所在的队列、偏移量等。通过分析队列的偏移量信息,可以计算出当前队列的消息积压量。在集成第三方监控系统(如 Prometheus + Grafana)后,可以在 Grafana 中直观地查看消息积压量的变化趋势,以便及时发现消息积压问题。
Broker 指标监控实现
- 磁盘使用率监控:在 Linux 系统中,可以通过
df -h
命令获取磁盘使用情况。为了实现对 Broker 磁盘使用率的实时监控,可以编写一个脚本定期执行该命令,并将结果发送到监控系统(如 Prometheus)。以下是一个简单的 Shell 脚本示例:
#!/bin/bash
diskUsage=$(df -h /path/to/rocketmq/store | awk 'NR==2{print $5}' | sed 's/%//')
echo "rocketmq_broker_disk_usage $diskUsage" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_broker_disk
上述脚本通过 df -h
命令获取 RocketMQ 存储目录的磁盘使用率,并将结果发送到 Prometheus 的指定接口。
2. 内存使用率监控:同样在 Linux 系统中,可以通过 free -h
命令获取内存使用情况。编写类似的脚本可以实现对 Broker 内存使用率的监控。以下是一个简单的 Shell 脚本示例:
#!/bin/bash
totalMemory=$(free -h | awk 'NR==2{print $2}' | sed 's/G//')
usedMemory=$(free -h | awk 'NR==2{print $3}' | sed 's/G//')
memoryUsage=$(echo "scale=2; $usedMemory / $totalMemory * 100" | bc)
echo "rocketmq_broker_memory_usage $memoryUsage" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_broker_memory
- 网络流量监控:在 Linux 系统中,可以使用
ifstat
等工具获取网络流量信息。以下是一个使用ifstat
获取 Broker 网络流量的 Shell 脚本示例:
#!/bin/bash
interface="eth0" # 根据实际情况修改网络接口
rxBytes=$(ifstat -i $interface 1 1 | awk 'NR==3{print $2}')
txBytes=$(ifstat -i $interface 1 1 | awk 'NR==3{print $3}')
echo "rocketmq_broker_network_rx_bytes $rxBytes" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_broker_network
echo "rocketmq_broker_network_tx_bytes $txBytes" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_broker_network
- CPU 使用率监控:在 Linux 系统中,可以通过
top -bn1
命令获取 CPU 使用情况。以下是一个简单的 Shell 脚本示例,用于获取 Broker 的 CPU 使用率:
#!/bin/bash
cpuUsage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2 + $4}')
echo "rocketmq_broker_cpu_usage $cpuUsage" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_broker_cpu
NameServer 指标监控实现
- 注册的 Broker 数量监控:可以通过
mqadmin clusterList
命令获取当前注册的 Broker 数量。编写一个脚本定期执行该命令,并将结果发送到监控系统(如 Prometheus)。以下是一个简单的 Shell 脚本示例:
#!/bin/bash
brokerCount=$(mqadmin clusterList -n localhost:9876 | grep "Broker Name" | wc -l)
echo "rocketmq_nameserver_broker_count $brokerCount" | curl --data-binary @- http://localhost:9091/metrics/job/rocketmq_nameserver_broker
- 请求响应时间监控:要监控 NameServer 的请求响应时间,可以在 NameServer 的代码中添加一些统计逻辑。例如,在处理客户端请求的方法中记录请求开始时间和结束时间,计算响应时间并上报到监控系统。以下是一个简单的示例,假设 NameServer 中有一个处理注册 Broker 请求的方法
registerBroker
:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NameServer {
private static final Logger logger = LoggerFactory.getLogger(NameServer.class);
public void registerBroker(BrokerInfo brokerInfo) {
long startTime = System.currentTimeMillis();
// 处理注册逻辑
//...
long endTime = System.currentTimeMillis();
long responseTime = endTime - startTime;
logger.info("Register Broker Request Response Time: {} ms", responseTime);
// 上报响应时间到监控系统
//...
}
}
指标阈值设定与预警
- 阈值设定原则:指标阈值的设定需要综合考虑业务需求、系统历史运行数据以及硬件资源等因素。例如,对于消息发送成功率,业务要求可能是不低于 99%;对于 Broker 的磁盘使用率,考虑到系统的稳定性和数据写入性能,阈值可以设定为 80%。在设定阈值时,既要避免阈值过于宽松导致问题发现不及时,又要避免阈值过于严格导致频繁误报警。
- 预警方式:常见的预警方式包括邮件、短信、即时通讯工具(如微信、钉钉)等。可以通过与监控系统集成,当指标超出阈值时,监控系统自动触发预警通知。例如,在 Prometheus 中,可以通过配置 Alertmanager 来实现预警功能。以下是一个简单的 Alertmanager 配置示例,用于在消息积压量超过 1000 条时发送邮件预警:
global:
resolve_timeout: 5m
route:
group_by: ['alertname']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receiver: 'email'
receivers:
- name: 'email'
email_configs:
- to: 'admin@example.com'
from: 'alert@example.com'
subject: 'RocketMQ Message Backlog Alert'
html: '{{ template "email.default.html" . }}'
templates:
- '/etc/alertmanager/template/*.tmpl'
在上述配置中,当 Prometheus 检测到消息积压量相关的指标超出设定阈值时,会将报警信息发送到 Alertmanager,Alertmanager 根据配置将邮件发送到指定的邮箱。
监控指标的分析与优化
- 性能瓶颈分析:通过对监控指标的持续分析,可以发现系统的性能瓶颈。例如,如果消息发送延迟持续升高,可能是 Broker 的网络带宽不足或者生产者的发送线程池配置不合理。通过进一步分析网络流量指标、CPU 使用率等相关指标,可以确定具体的瓶颈点。如果发现网络带宽不足,可以考虑进行网络扩容;如果是生产者线程池配置问题,可以调整线程池的参数,如增加线程数量等。
- 业务优化:监控指标不仅可以反映系统的技术性能,还可以为业务优化提供参考。例如,如果发现某个主题的消息消费速率较低,可能是业务处理逻辑过于复杂导致。此时,可以对业务逻辑进行优化,如采用异步处理、缓存等技术,提高消息的消费速率,从而更好地满足业务需求。
- 系统容量规划:通过长期监控消息发送和消费的 TPS 等指标,可以预测业务的发展趋势,从而进行系统容量规划。如果预计业务的消息产生量将大幅增长,提前对 RocketMQ 集群进行扩容,如增加 Broker 节点、调整存储资源等,以确保系统能够持续稳定地运行。
综上所述,建立一个完善的 RocketMQ 消息队列监控指标体系,并通过有效的方式获取、分析和利用这些指标数据,对于保障 RocketMQ 系统的稳定运行、提高业务性能以及优化系统架构都具有至关重要的意义。在实际应用中,需要根据具体的业务场景和系统需求,不断完善和优化监控指标体系,以满足日益增长的业务需求。