MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 的监控指标与监控工具介绍

2022-09-082.0k 阅读

Kafka 监控指标概述

Kafka 作为分布式流处理平台,在大规模数据处理场景中广泛应用。为了确保 Kafka 集群高效、稳定运行,监控其各项指标至关重要。Kafka 的监控指标涵盖多个方面,包括 broker 层面、topic 层面以及消费者层面等。这些指标能帮助我们洞察 Kafka 集群的运行状况,提前发现潜在问题并及时解决。

Broker 层面监控指标

  1. CPU 使用率

    • 含义:反映 Kafka broker 所在服务器 CPU 的繁忙程度。过高的 CPU 使用率可能导致 Kafka 处理消息延迟,甚至出现消息堆积。
    • 影响:如果 CPU 使用率长期接近 100%,Kafka 可能无法及时处理新消息的请求,导致客户端等待时间过长。同时,在进行数据压缩、解压缩等操作时,也依赖 CPU 资源,高 CPU 使用率可能影响这些操作的效率。
    • 获取方式:在 Linux 系统中,可以使用 tophtop 命令查看 Kafka broker 进程对应的 CPU 使用率。在 Kafka 自带的 JMX(Java Management Extensions)指标体系中,也能获取到与 CPU 相关的指标,如 kafka.server:type=KafkaServer,name=CPUPercent
  2. 内存使用率

    • 含义:体现 Kafka broker 进程占用服务器内存的情况。Kafka 使用内存来缓存消息,合理的内存使用能提高消息读写性能。
    • 影响:内存不足可能导致消息无法及时缓存,进而影响 Kafka 的吞吐量。如果 Kafka 频繁进行内存交换(swap),会严重降低性能。
    • 获取方式:通过 free 命令可查看系统整体内存使用情况,进而推断 Kafka broker 的内存使用。在 JMX 指标中,kafka.server:type=KafkaServer,name=MemoryPercent 可获取 Kafka 进程的内存使用率相关指标。
  3. 磁盘 I/O

    • 含义:涉及 Kafka 对磁盘的读写操作。Kafka 将消息持久化到磁盘,磁盘 I/O 的性能直接影响消息的写入和读取速度。
    • 影响:缓慢的磁盘 I/O 可能导致消息写入延迟,尤其是在高吞吐量场景下。如果磁盘出现故障,可能导致数据丢失。
    • 获取方式:在 Linux 系统中,iostat 命令可用于监控磁盘 I/O 情况,例如查看每秒的读写次数(r/sw/s)、每秒读写的数据量(rKB/swKB/s)等。Kafka 自身也提供了一些与磁盘相关的 JMX 指标,如 kafka.log:type=Log,name=Size 可获取日志文件的大小,间接反映磁盘使用情况。
  4. 网络流量

    • 含义:表示 Kafka broker 与客户端、其他 broker 之间的数据传输量。网络流量的大小反映了 Kafka 集群的数据流动情况。
    • 影响:高网络流量可能导致网络拥塞,影响消息的传输速度。如果网络不稳定,可能会出现消息丢失或重传的情况。
    • 获取方式:在 Linux 系统中,ifstat 等工具可用于监控网络接口的流量情况,如每秒的接收和发送字节数。Kafka 的 JMX 指标中,kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent 可反映网络处理器的空闲百分比,与网络流量有一定关联。
  5. 活跃连接数

    • 含义:指当前与 Kafka broker 建立的活跃 TCP 连接数量。连接数的多少反映了客户端与 Kafka 交互的频繁程度。
    • 影响:过多的活跃连接可能消耗 broker 的资源,导致性能下降。如果连接数突然激增,可能表示有异常的客户端行为或系统故障。
    • 获取方式:通过 netstat 命令可以查看 Kafka broker 监听端口的连接数,例如 netstat -an | grep :9092 | grep ESTABLISHED | wc -l 可统计与 Kafka 默认端口 9092 建立的 ESTABLISHED 状态连接数。在 Kafka 的 JMX 指标中,kafka.network:type=SocketServer,name=ActiveConnectionsCount 可直接获取活跃连接数。
  6. 分区副本状态

    • 含义:Kafka 通过分区副本机制保证数据的高可用性。分区副本状态反映了各个分区副本是否正常工作。
    • 影响:如果某个分区的副本出现故障,可能导致数据丢失或读取不一致。同时,副本的同步情况也会影响 Kafka 的整体性能。
    • 获取方式:可以使用 Kafka 自带的命令行工具 kafka-topics.sh 来查看分区副本状态,例如 kafka - topics.sh --describe --bootstrap - servers <bootstrap - servers> --topic <topic - name>,该命令会显示每个分区的 leader 副本以及 follower 副本的状态等信息。在 JMX 指标中,kafka.server:type=ReplicaFetcherManager,name=UnderReplicatedPartitions 可获取未充分复制的分区数量。

Topic 层面监控指标

  1. 消息生产速率
    • 含义:指单位时间内生产者向 topic 写入的消息数量或字节数。它反映了数据流入 Kafka 的速度。
    • 影响:生产速率过高可能导致 topic 的分区负载不均衡,或者超出 Kafka 集群的处理能力,进而影响整个集群的性能。如果生产速率突然下降,可能表示生产者端出现问题,如网络故障、应用程序逻辑错误等。
    • 获取方式:可以通过自定义生产者拦截器来统计消息生产速率。以下是一个简单的 Java 代码示例:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class MessageRateInterceptor implements ProducerInterceptor<String, String> {
    private AtomicLong messageCount = new AtomicLong(0);
    private long startTime = System.currentTimeMillis();

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        messageCount.incrementAndGet();
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 可在此处进行更复杂的统计逻辑
    }

    @Override
    public void close() {
        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;
        double rate = messageCount.get() / (elapsedTime / 1000.0);
        System.out.println("Message production rate: " + rate + " messages per second");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 配置相关逻辑
    }
}

在生产者配置中添加该拦截器:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("interceptor.classes", "MessageRateInterceptor");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

此外,Kafka 的 JMX 指标中,kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=<topic - name> 可获取指定 topic 的每秒消息流入量。

  1. 消息消费速率
    • 含义:表示单位时间内消费者从 topic 读取的消息数量或字节数。它体现了数据从 Kafka 流出的速度。
    • 影响:消费速率过低可能导致消息在 topic 中堆积,占用磁盘空间,同时可能影响业务流程的正常进行。如果消费速率过高,可能超过消费者应用程序的处理能力,导致数据处理不及时。
    • 获取方式:类似于生产者拦截器,可通过自定义消费者拦截器来统计消费速率。以下是一个简单的 Java 代码示例:
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class ConsumptionRateInterceptor implements ConsumerInterceptor<String, String> {
    private AtomicLong messageCount = new AtomicLong(0);
    private long startTime = System.currentTimeMillis();

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        messageCount.addAndGet(records.count());
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 可在此处进行更复杂的统计逻辑
    }

    @Override
    public void close() {
        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;
        double rate = messageCount.get() / (elapsedTime / 1000.0);
        System.out.println("Message consumption rate: " + rate + " messages per second");
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 配置相关逻辑
    }
}

在消费者配置中添加该拦截器:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test - group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("interceptor.classes", "ConsumptionRateInterceptor");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

在 Kafka 的 JMX 指标中,kafka.consumer:type=ConsumerFetcherManager,clientId=consumer - 1,topic=<topic - name>,name=RecordsConsumedPerSec 可获取指定 topic 特定消费者组每秒消费的记录数。

  1. 消息堆积量
    • 含义:指 topic 中尚未被消费者消费的消息数量。消息堆积可能是由于消费速率低于生产速率等原因导致。
    • 影响:大量的消息堆积会占用磁盘空间,增加 Kafka 集群的存储压力。同时,长时间的堆积可能导致数据处理延迟,影响业务的时效性。
    • 获取方式:可以通过 Kafka 自带的 kafka - consumer - offsets.sh 命令结合 --describe 选项来查看特定 topic 和消费者组的偏移量信息,进而计算出消息堆积量。例如:
kafka - consumer - offsets.sh --bootstrap - servers <bootstrap - servers> --group <group - id> --describe

通过该命令获取已提交的偏移量和 log 末端偏移量(log end offset,LEO),两者差值即为消息堆积量。在 JMX 指标中,kafka.consumer:type=ConsumerFetcherManager,clientId=consumer - 1,topic=<topic - name>,name=PartitionLag 可获取特定消费者组在每个分区上的滞后量(即消息堆积量)。

  1. 分区负载均衡
    • 含义:反映 topic 的各个分区在不同 broker 上的负载分布情况。良好的分区负载均衡能充分利用 Kafka 集群的资源。
    • 影响:如果分区负载不均衡,可能导致部分 broker 压力过大,而其他 broker 资源闲置,影响整个集群的性能和可扩展性。
    • 获取方式:使用 kafka - topics.sh 命令的 --describe 选项查看 topic 的分区信息,包括每个分区的 leader 副本所在的 broker 等。通过分析这些信息,可以判断分区负载是否均衡。例如:
kafka - topics.sh --describe --bootstrap - servers <bootstrap - servers> --topic <topic - name>

观察每个分区的 leader 副本分布,如果某个 broker 上承载了过多分区的 leader 副本,可能表示负载不均衡。此外,Kafka 的 JMX 指标中,kafka.server:type=ReplicaManager,name=PartitionCount 可获取每个 broker 上的分区数量,结合分区在不同 broker 上的分布情况进一步分析负载均衡。

消费者层面监控指标

  1. 消费者组滞后量
    • 含义:指消费者组中所有消费者实例滞后于最新消息的总偏移量。它反映了消费者组整体处理消息的速度与消息生产速度的差距。
    • 影响:消费者组滞后量过大,表明消费者组处理消息的能力不足,可能导致消息长时间堆积,影响业务的实时性。
    • 获取方式:可以使用 kafka - consumer - offsets.sh 命令结合 --describe 选项来查看特定消费者组的滞后量。例如:
kafka - consumer - offsets.sh --bootstrap - servers <bootstrap - servers> --group <group - id> --describe

计算每个分区的滞后量(LEO - 已提交偏移量),然后将所有分区的滞后量相加,即可得到消费者组的滞后量。在 Kafka 的 JMX 指标中,kafka.consumer:type=ConsumerFetcherManager,clientId=consumer - 1,group=<group - id>,name=ConsumerLag 可直接获取消费者组的滞后量。

  1. 消费者心跳频率

    • 含义:消费者通过定期向 Kafka 集群发送心跳来表明自己的存活状态。心跳频率反映了消费者与 Kafka 集群之间的连接稳定性。
    • 影响:如果心跳频率过低或心跳中断,Kafka 可能会认为消费者已死亡,从而触发重新平衡(rebalance)操作。频繁的重新平衡会影响消费者组的稳定性,导致消息处理中断。
    • 获取方式:在消费者配置中,可以通过设置 heartbeat.interval.ms 参数来控制心跳频率。通过查看 Kafka 日志或使用 Kafka 提供的监控工具(如 Kafka Eagle),可以观察到消费者的心跳情况。在 JMX 指标中,kafka.consumer:type=ConsumerCoordinator,clientId=consumer - 1,group=<group - id>,name=HeartbeatRateAndTimeMs 可获取消费者心跳的频率和时间相关指标。
  2. 消费者吞吐量

    • 含义:类似于消息消费速率,但更侧重于消费者应用程序处理消息的实际吞吐量,包括消息处理逻辑的耗时等因素。
    • 影响:消费者吞吐量低可能是由于消费者应用程序的性能瓶颈,如数据库写入速度慢、复杂的业务逻辑处理等。这会影响整个业务流程的效率。
    • 获取方式:可以在消费者应用程序内部,通过记录处理消息的开始时间和结束时间,统计单位时间内处理的消息数量,从而计算出吞吐量。以下是一个简单的 Java 代码示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public class ConsumerThroughputExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test - group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test - topic"));

        AtomicLong messageCount = new AtomicLong(0);
        long startTime = System.currentTimeMillis();

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                long startProcessTime = System.currentTimeMillis();
                // 模拟业务处理逻辑
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long endProcessTime = System.currentTimeMillis();
                messageCount.incrementAndGet();
                long elapsedTime = endProcessTime - startTime;
                if (elapsedTime >= 1000) {
                    double throughput = messageCount.get() / (elapsedTime / 1000.0);
                    System.out.println("Consumer throughput: " + throughput + " messages per second");
                    messageCount.set(0);
                    startTime = System.currentTimeMillis();
                }
            }
        }
    }
}

Kafka 监控工具介绍

  1. Kafka 自带 JMX 监控
    • 原理:Kafka 基于 Java 开发,利用 JMX 技术暴露了大量的监控指标。JMX 允许开发者通过 MBean(Managed Bean)来管理和监控 Java 应用程序。Kafka 将各种内部状态和统计信息封装成 MBean,通过 JMX 接口对外提供。
    • 使用方法:首先需要在 Kafka broker 启动脚本中配置 JMX 相关参数,例如:
export JMX_PORT="9999"
export JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=<your - server - ip>"

然后可以使用工具如 jconsole(Java 自带)或 jvisualvm 来连接 Kafka broker 的 JMX 端口(如 9999),查看各种监控指标。以 jconsole 为例,启动 jconsole 后,选择远程进程,输入 Kafka broker 的 IP 和 JMX 端口,即可连接并查看 Kafka 的 JMX 指标。在 jconsole 的 “MBeans” 选项卡中,可以浏览 Kafka 各个方面的指标,如 kafka.server 下的 broker 相关指标,kafka.consumer 下的消费者相关指标等。

  • 优点:无需额外安装复杂的监控系统,简单易用,能快速获取 Kafka 内部的详细指标信息。
  • 缺点:图形化界面相对简单,对于大规模 Kafka 集群的监控不太方便,且不具备指标持久化、告警等高级功能。
  1. Kafka Eagle
    • 原理:Kafka Eagle 是一个开源的 Kafka 监控工具,它通过连接 Kafka 集群,收集 Kafka 的各种指标数据,包括 broker、topic、consumer 等层面的指标。它支持从 Kafka 自带的 JMX 接口获取指标,也可以通过 Kafka 提供的 REST API 进行数据采集。
    • 安装与使用:首先下载 Kafka Eagle 的安装包,解压后进行配置。在 conf/system.properties 文件中配置 Kafka 集群信息、Zookeeper 地址等。例如:
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=zk1:2181,zk2:2181,zk3:2181

然后启动 Kafka Eagle,通过浏览器访问其 Web 界面(默认端口 8048)。在界面上,可以直观地查看 Kafka 集群的拓扑结构、各个 topic 的生产消费速率、消息堆积情况等。还可以对消费者组进行详细监控,包括滞后量、成员信息等。

  • 优点:提供了丰富的图形化界面,方便直观地查看 Kafka 集群的整体运行状况。支持多集群管理,对于大规模 Kafka 部署非常友好。具备一定的告警功能,可以设置阈值,当指标超出阈值时发送告警通知。
  • 缺点:部署和配置相对复杂,需要对 Kafka 和 Zookeeper 有一定的了解。在高并发场景下,可能会对 Kafka 集群产生一定的性能影响。
  1. Prometheus + Grafana
    • 原理:Prometheus 是一个开源的系统监控和告警工具包,它通过 pull 模型定期从目标系统(如 Kafka)拉取指标数据。Kafka 可以通过配置 JMX Exporter 将 JMX 指标暴露为 Prometheus 可识别的格式。Grafana 是一个可视化平台,它可以从 Prometheus 获取数据,并以图表等形式进行展示。
    • 安装与配置:首先安装 Prometheus,在其配置文件 prometheus.yml 中添加 Kafka JMX Exporter 的数据源配置,例如:
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka - server:9999']
    metrics_path: /metrics
    params:
      module: [jmx]
    relabel_configs:
      - source_labels: [__address__]
        target_label: __param_target
      - source_labels: [__param_target]
        target_label: instance
      - target_label: __address__
        replacement: kafka - jmx - exporter:9101

然后安装并配置 Kafka JMX Exporter,在 Kafka broker 启动脚本中添加 JMX Exporter 相关参数,例如:

export JAVA_OPTS="$JAVA_OPTS -javaagent:/path/to/jmx_prometheus_javaagent.jar=9101:/path/to/kafka - jmx - exporter - config.yml"

配置 Grafana,添加 Prometheus 数据源,然后导入 Kafka 相关的 Grafana 仪表盘模板(如 kafka - cluster - monitoring - template.json),即可在 Grafana 中查看 Kafka 的各种监控指标图表。

  • 优点:Prometheus 强大的指标收集和查询功能,结合 Grafana 丰富的可视化选项,能够定制出非常灵活和详细的监控界面。Prometheus 的告警功能也很强大,可以根据自定义的规则发送告警。适用于大规模分布式系统的监控。
  • 缺点:部署和配置较为复杂,需要对 Prometheus、Grafana 和 Kafka JMX Exporter 都有深入了解。由于采用 pull 模型,可能存在一定的数据采集延迟。
  1. Confluent Control Center
    • 原理:Confluent Control Center 是 Confluent 公司提供的 Kafka 集群管理和监控工具。它通过与 Kafka 集群紧密集成,收集 Kafka 各个组件的详细指标,包括 broker、topic、consumer 等。它还利用 Kafka 内部的一些机制,如 Kafka Connect 来获取更全面的数据。
    • 使用方法:首先需要安装 Confluent Platform,其中包含 Control Center。安装完成后,启动 Control Center 服务。通过浏览器访问 Control Center 的 Web 界面(默认端口 9021)。在界面上,可以对 Kafka 集群进行全方位的管理和监控,包括创建和管理 topic、查看消费者组的详细信息、进行数据治理等。它还提供了性能优化建议等高级功能。
    • 优点:功能非常全面,不仅提供监控功能,还集成了 Kafka 集群的管理功能。提供了智能的性能分析和优化建议,对于 Kafka 运维人员非常有帮助。与 Confluent 生态系统其他组件无缝集成。
    • 缺点:属于商业产品,虽然有免费试用版,但功能可能受限。部署和维护成本较高,需要一定的技术支持。对 Confluent 平台有依赖,可能不适用于非 Confluent 部署的 Kafka 集群。