MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

消息队列的故障排查与定位技巧

2021-03-197.1k 阅读

消息队列常见故障类型

消息丢失

  1. 生产者端消息丢失
    • 在消息队列的使用中,生产者端可能因为网络问题、程序异常等原因导致消息未能成功发送到消息队列。例如,在使用 RabbitMQ 时,生产者调用 basicPublish 方法发送消息,如果网络突然中断,可能消息还未到达 RabbitMQ 服务器就丢失了。
    • 代码示例(Python + pika 库连接 RabbitMQ)
import pika

try:
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test_queue')
    message = "Hello, RabbitMQ!"
    channel.basic_publish(exchange='', routing_key='test_queue', body=message)
    print(" [x] Sent 'Hello, RabbitMQ!'")
    connection.close()
except pika.exceptions.AMQPConnectionError as e:
    print(f"Connection error: {e}, message may be lost")
  • 在上述代码中,如果连接 RabbitMQ 服务器出现 AMQPConnectionError,则消息很可能丢失。为了避免这种情况,可以使用 RabbitMQ 的事务机制或者发送确认机制(publisher confirm)。
  • 事务机制示例
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')
message = "Hello, RabbitMQ with transaction!"

try:
    channel.tx_select()
    channel.basic_publish(exchange='', routing_key='test_queue', body=message)
    channel.tx_commit()
    print(" [x] Sent 'Hello, RabbitMQ with transaction!'")
except Exception as e:
    channel.tx_rollback()
    print(f"Transaction error: {e}, message may be lost")
finally:
    connection.close()
  • 发送确认机制示例
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')
message = "Hello, RabbitMQ with confirm!"

def confirm_callback(frame):
    if frame.method.NAME == 'Basic.Ack':
        print("Message sent successfully")
    else:
        print("Message delivery failed")

channel.confirm_delivery()
channel.add_on_return_callback(confirm_callback)
channel.basic_publish(exchange='', routing_key='test_queue', body=message, mandatory=True)
time.sleep(1)
connection.close()
  1. 消息队列内部丢失
    • 消息队列自身的故障,如磁盘故障(如果消息队列使用磁盘存储消息)、内存溢出(对于基于内存的消息队列)等都可能导致消息丢失。以 Kafka 为例,Kafka 通过多副本机制来保证消息的可靠性。但是,如果配置不当,例如副本数量设置不合理,当 leader 副本所在的节点出现故障,而 follower 副本还未完全同步消息时,就可能导致部分消息丢失。
    • Kafka 配置文件(server.properties)中与副本相关的配置项:
# 配置副本因子,一般设置为大于1
num.replica.fetchers=2
# 配置 follower 副本与 leader 副本同步消息的最大延迟时间
replica.lag.time.max.ms=10000
  • 如果 replica.lag.time.max.ms 设置过大,可能导致在 leader 副本故障时,follower 副本长时间未同步到最新消息,从而丢失消息。
  1. 消费者端消息丢失
    • 消费者从消息队列中获取消息后,在处理消息的过程中发生异常,而消息队列又配置为自动确认消息已被消费,就会导致消息丢失。例如,在使用 RabbitMQ 时,如果消费者设置 auto_ack=True
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')

def callback(ch, method, properties, body):
    raise Exception("Simulating processing error")
    print(f"Received message: {body}")

channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • 在上述代码中,auto_ack=True 表示一旦消费者接收到消息,RabbitMQ 就认为该消息已被消费,即使消费者处理消息时抛出异常,消息也不会重新回到队列,从而导致消息丢失。解决方法是设置 auto_ack=False,并在消息处理成功后手动确认消息:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue')

def callback(ch, method, properties, body):
    try:
        print(f"Received message: {body}")
        ch.basic_ack(delivery_tag = method.delivery_tag)
    except Exception as e:
        print(f"Error processing message: {e}")

channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消息重复消费

  1. 消费者端重复消费
    • 消息队列的可靠性机制可能会导致消息重复消费。例如,在 Kafka 中,当消费者处理完消息但还未来得及提交消费偏移量(offset)时,消费者所在节点发生故障,重新启动后,由于之前未提交偏移量,就会从上次消费的位置重新开始消费,导致部分消息被重复消费。
    • Kafka 消费者代码示例(Python + confluent - kafka 库)
from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test_topic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf - 8')))
    # 模拟处理消息过程中未及时提交偏移量
    # c.commit()

c.close()
  • 在上述代码中,如果在处理完消息后未调用 c.commit() 提交偏移量,当消费者重启时,就可能重复消费消息。为了避免重复消费,可以采用幂等性处理。即消费者在处理消息前,先检查该消息是否已经处理过。例如,可以使用数据库来记录已处理的消息 ID。
  • 使用 MySQL 记录已处理消息 ID 的示例(Python + pymysql 库)
import pymysql
from confluent_kafka import Consumer, KafkaError

# 连接数据库
conn = pymysql.connect(host='localhost', user='root', password='password', database='test')
cursor = conn.cursor()

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test_topic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    message_id = msg.value().decode('utf - 8')
    # 检查消息是否已处理
    check_sql = "SELECT id FROM processed_messages WHERE message_id = %s"
    cursor.execute(check_sql, (message_id,))
    result = cursor.fetchone()
    if not result:
        print('Received message: {}'.format(message_id))
        # 处理消息
        # 记录已处理消息
        insert_sql = "INSERT INTO processed_messages (message_id) VALUES (%s)"
        cursor.execute(insert_sql, (message_id,))
        conn.commit()
    c.commit()

c.close()
cursor.close()
conn.close()
  1. 消息队列重试导致重复消费
    • 当消息队列检测到消息处理失败时,可能会进行重试,这也可能导致消息重复消费。以 RocketMQ 为例,RocketMQ 提供了消息重试机制。如果消费者消费消息失败,RocketMQ 会根据重试策略将消息重新投递给消费者。如果消费者在重试过程中没有正确处理幂等性,就会导致消息重复消费。
    • RocketMQ 的重试配置在 consumer.properties 文件中:
# 最大重试次数
consumer.maxReconsumeTimes=3
  • 消费者在处理消息时,应确保幂等性。例如,在处理订单消息时,如果是创建订单的消息,每次处理前先检查订单是否已存在,若已存在则不再重复创建。

消息积压

  1. 消费者处理能力不足导致积压
    • 当消费者的处理速度跟不上生产者的发送速度时,就会出现消息积压。例如,在电商系统中,订单生成后,生产者快速将订单消息发送到消息队列,而消费者可能因为业务逻辑复杂,如需要进行库存检查、价格计算、积分计算等操作,导致处理订单消息的速度较慢,从而使消息在队列中积压。
    • 代码示例(模拟生产者快速发送消息,消费者处理缓慢)
    • 生产者(Python + pika 库连接 RabbitMQ)
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')

for i in range(100):
    message = f"Order {i}"
    channel.basic_publish(exchange='', routing_key='order_queue', body=message)
    print(f" [x] Sent '{message}'")
    time.sleep(0.1)

connection.close()
  • 消费者(Python + pika 库连接 RabbitMQ,处理缓慢)
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')

def callback(ch, method, properties, body):
    print(f"Received message: {body}")
    # 模拟复杂业务处理,处理时间长
    time.sleep(1)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='order_queue', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
  • 在上述代码中,生产者以每 0.1 秒发送一条消息的速度发送 100 条消息,而消费者处理一条消息需要 1 秒,这样很快就会出现消息积压。解决方法可以是增加消费者实例数量,提高消费者处理能力。例如,在 RabbitMQ 中,可以启动多个消费者进程同时消费消息。
  1. 消息队列配置不合理导致积压
    • 消息队列的一些配置参数可能会导致消息积压。例如,在 Kafka 中,如果分区数量设置过少,而生产者发送消息的速度又很快,就会导致单个分区内消息积压。另外,Kafka 的 log.retention.hours 参数设置过小,可能会导致旧消息被删除,而新消息又不断涌入,造成积压假象(实际上旧消息被删除了,但新消息处理不过来)。
    • Kafka 配置示例
# 分区数量
num.partitions=1
# 日志保留时间(小时)
log.retention.hours=1
  • 如果 num.partitions 设置为 1,而生产者发送消息的速率较高,就容易造成消息积压在这一个分区中。可以通过增加分区数量来缓解积压问题:
num.partitions=10
  • 同时,合理调整 log.retention.hours 参数,确保有足够的时间处理消息,例如设置为 log.retention.hours=24
  1. 网络问题导致积压
    • 网络延迟、网络抖动等问题可能会影响消息在生产者、消息队列和消费者之间的传输,从而导致消息积压。例如,在分布式系统中,生产者和消息队列可能分布在不同的机房,网络连接不稳定,导致生产者发送消息到消息队列的速度变慢,而生产者依然按照正常速度生产消息,从而造成消息在生产者端积压。同样,消费者从消息队列获取消息时,如果网络不稳定,也会导致获取消息的速度变慢,造成消息在消息队列中积压。
    • 要解决网络问题导致的积压,需要对网络进行优化,例如增加网络带宽、优化网络拓扑结构、配置合适的网络超时时间等。在代码层面,可以设置合适的网络连接超时参数。例如,在使用 Kafka 时,bootstrap.servers 配置中的连接超时参数:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# 连接超时时间(毫秒)
socket.connection.timeout.ms=10000

消息队列故障排查工具与方法

消息队列自带监控工具

  1. RabbitMQ Management Console
    • RabbitMQ 提供了一个基于 Web 的管理控制台(Management Console),可以通过它监控 RabbitMQ 服务器的各种指标,如队列的消息数量、消费者数量、连接数等。要启用管理控制台,首先需要安装 rabbitmq - management 插件:
rabbitmq - plugins enable rabbitmq_management
  • 安装完成后,通过浏览器访问 http://localhost:15672(默认端口),使用默认用户名 guest 和密码 guest(生产环境建议修改)登录。在管理控制台中,可以看到各个队列的详细信息,例如:
  • 队列页面截图:[此处插入 RabbitMQ 管理控制台队列页面截图,展示队列名称、消息数量、消费者数量等信息]
  • 通过观察队列的消息数量变化,可以判断是否存在消息积压问题。如果消息数量持续增加且消费者数量正常,很可能是消费者处理能力不足。同时,在连接页面可以查看生产者和消费者的连接状态,如果有连接异常断开的情况,可能导致消息丢失或发送失败。
  1. Kafka Tools
    • Kafka 没有像 RabbitMQ 那样内置的 Web 管理控制台,但有一些第三方工具可以用于监控 Kafka,如 Kafka Tools。Kafka Tools 是一个跨平台的 Kafka 管理工具,可以查看 Kafka 集群的拓扑结构、主题(topic)的详细信息、消费者组的消费进度等。
    • 安装与使用:下载并安装 Kafka Tools,打开软件后,添加 Kafka 集群连接,输入 Kafka 集群的 bootstrap.servers 地址。在界面中,可以看到主题列表,点击主题可以查看分区信息、每个分区的偏移量等。例如,在查看消费者组时,可以看到每个消费者组的消费进度,如果消费进度长时间停滞不前,可能存在消息积压或消费者故障。
    • Kafka Tools 主题详情页面截图:[此处插入 Kafka Tools 主题详情页面截图,展示分区、偏移量等信息]
  2. RocketMQ Console
    • RocketMQ 官方提供了 RocketMQ Console 用于监控和管理 RocketMQ 集群。首先需要下载 RocketMQ Console 的代码并进行编译打包,然后通过以下命令启动:
java -jar rocketmq - console - n. n. n. jar -- rocketmq. namesrv. addr = namesrv1:9876;namesrv2:9876 -- server. port = 8080
  • 启动后,通过浏览器访问 http://localhost:8080,可以看到 RocketMQ 集群的整体信息,包括主题、队列、消费者组等。在主题页面,可以查看消息的发送速率、消费速率,如果发送速率远大于消费速率,可能存在消息积压。同时,在消费者组页面,可以查看消费者的状态,如是否在线、消费偏移量等信息,有助于排查消费者相关的故障。
  • RocketMQ Console 主题页面截图:[此处插入 RocketMQ Console 主题页面截图,展示发送速率、消费速率等信息]

日志分析

  1. 生产者日志
    • 生产者的日志可以提供消息发送过程中的详细信息,帮助排查消息丢失、发送失败等问题。例如,在使用 RabbitMQ 时,通过配置日志级别,可以在日志中看到消息发送的具体情况。在 Python 的 pika 库中,可以通过以下方式配置日志:
import logging

logging.basicConfig(level = logging.INFO)
  • 当消息发送失败时,日志中可能会记录类似 AMQPConnectionErrorChannelClosedByBroker 等错误信息,根据这些信息可以进一步定位问题。如果是网络问题导致的 AMQPConnectionError,可以检查网络连接、防火墙设置等。
  • 示例日志
[INFO] 2023 - 10 - 01 12:00:00,000 Starting basic publish
[ERROR] 2023 - 10 - 01 12:00:01,000 AMQPConnectionError: Connection to broker lost
  1. 消息队列日志
    • 消息队列自身的日志记录了其内部的运行情况,对于排查消息丢失、队列故障等问题非常关键。以 Kafka 为例,Kafka 的日志文件位于 logs 目录下,主要有 server.logstate - change.log 等。
    • server.log 记录了 Kafka 服务器的启动、停止、节点间通信等信息。如果 Kafka 发生崩溃,server.log 中会记录崩溃前的异常信息,如 OutOfMemoryError 等,提示可能是内存不足导致的故障。
    • state - change.log 记录了 Kafka 集群状态的变化,如主题的创建、删除,分区的重新分配等。当出现消息积压或丢失问题时,查看这个日志可以了解是否因为集群状态变化导致了问题。
    • 示例 Kafka server.log 日志
[2023 - 10 - 01 12:00:00,000] INFO [KafkaServer id = 0] started (kafka.server.KafkaServer)
[2023 - 10 - 01 12:05:00,000] ERROR [KafkaServer id = 0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.OutOfMemoryError: Java heap space
  1. 消费者日志
    • 消费者日志可以帮助排查消息消费失败、重复消费等问题。在消费者代码中,通过记录详细的日志信息,如消息处理开始时间、结束时间、处理结果等,可以分析消费过程中的问题。例如,在使用 Kafka 消费者时,可以在回调函数中记录日志:
from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'test_group',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['test_topic'])

import logging
logging.basicConfig(level = logging.INFO)

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            logging.error(msg.error())
            break

    logging.info('Received message: {}'.format(msg.value().decode('utf - 8')))
    try:
        # 处理消息
        pass
        logging.info('Message processed successfully')
        c.commit()
    except Exception as e:
        logging.error(f"Error processing message: {e}")

c.close()
  • 如果日志中频繁出现 Error processing message 错误信息,说明消费者在处理消息时遇到问题,需要进一步分析异常原因,可能是业务逻辑错误、依赖服务不可用等。

抓包工具

  1. Wireshark
    • Wireshark 是一款常用的网络抓包工具,可以捕获网络流量并分析其中的协议数据。在排查消息队列故障时,如果怀疑是网络问题导致消息丢失、积压等,可以使用 Wireshark 捕获生产者、消息队列和消费者之间的网络流量。
    • 使用方法:启动 Wireshark 后,选择正确的网络接口(如 eth0)开始捕获流量。然后,在生产者发送消息、消费者接收消息的过程中,Wireshark 会捕获到相关的网络数据包。对于 RabbitMQ,其使用 AMQP 协议,在 Wireshark 中可以过滤出 AMQP 相关的数据包进行分析。例如,如果发现 AMQP 数据包中有大量的重传请求,可能说明网络不稳定,导致消息传输出现问题。
    • Wireshark 捕获 AMQP 数据包截图:[此处插入 Wireshark 捕获 AMQP 数据包截图,展示数据包详细信息]
  2. tcpdump
    • tcpdump 是一个基于命令行的网络抓包工具,在 Linux 系统中广泛使用。它可以捕获指定网络接口的 TCP、UDP 等数据包。例如,要捕获 eth0 接口上与 Kafka 服务器(假设 IP 为 192.168.1.100,端口 9092)相关的流量,可以使用以下命令:
tcpdump -i eth0 host 192.168.1.100 and port 9092 -w kafka.pcap
  • 上述命令会将捕获到的流量保存到 kafka.pcap 文件中,然后可以使用 Wireshark 等工具打开该文件进行详细分析。tcpdump 适合在服务器上快速捕获网络流量,而 Wireshark 更适合进行可视化的数据分析。

消息队列故障定位实战案例

案例一:RabbitMQ 消息丢失问题定位

  1. 问题描述
    • 在一个电商订单处理系统中,使用 RabbitMQ 作为消息队列。生产者将订单消息发送到 RabbitMQ 后,部分订单消息在消费者端未收到,导致订单处理不完整。
  2. 排查过程
    • 检查生产者代码:首先查看生产者的日志,发现没有明显的发送失败错误。但是,进一步检查代码发现,生产者没有使用发送确认机制(publisher confirm),并且 basicPublish 方法没有设置 mandatory 参数为 True。这意味着即使消息发送到 RabbitMQ 服务器失败,生产者也不会收到相关通知。
    • 检查 RabbitMQ 管理控制台:登录 RabbitMQ 管理控制台,查看队列信息,发现队列中的消息数量与生产者发送的消息数量不一致,说明部分消息在 RabbitMQ 内部丢失。查看 RabbitMQ 的日志,发现有磁盘空间不足的警告信息。原来,由于服务器磁盘空间已满,RabbitMQ 在写入消息到磁盘时失败,导致部分消息丢失。
  3. 解决方案
    • 修改生产者代码:在生产者端启用发送确认机制,并设置 mandatory 参数为 True,以便在消息发送失败时能及时收到通知并进行处理。
    • 清理磁盘空间:清理 RabbitMQ 服务器所在主机的磁盘空间,确保 RabbitMQ 有足够的空间存储消息。同时,可以考虑调整 RabbitMQ 的消息存储策略,如增加内存缓存消息的比例,减少对磁盘的依赖。

案例二:Kafka 消息积压问题定位

  1. 问题描述
    • 在一个实时数据分析系统中,使用 Kafka 作为消息队列。生产者不断将业务数据发送到 Kafka 主题,但消费者处理数据的速度逐渐跟不上,导致消息积压越来越严重,最终影响数据分析的实时性。
  2. 排查过程
    • 使用 Kafka Tools 监控:通过 Kafka Tools 查看主题的分区信息和消费者组的消费进度。发现某个分区的消息堆积量特别大,而其他分区相对正常。进一步查看消费者组信息,发现消费该分区的消费者实例出现了 CPU 使用率过高的情况。
    • 分析消费者代码:检查消费者代码,发现消费者在处理消息时进行了复杂的数据库查询操作,并且没有进行合理的优化。这导致消费者处理消息的速度很慢,无法及时消费 Kafka 中的消息。
  3. 解决方案
    • 优化消费者代码:对消费者中的数据库查询操作进行优化,例如添加合适的索引、批量查询等。同时,可以考虑将一些复杂的计算逻辑放到离线任务中处理,提高消费者实时处理消息的能力。
    • 调整分区策略:根据业务数据的特点,调整 Kafka 主题的分区策略,使消息更均匀地分布在各个分区,避免某个分区出现严重的消息积压。

案例三:RocketMQ 消息重复消费问题定位

  1. 问题描述
    • 在一个订单支付系统中,使用 RocketMQ 作为消息队列。消费者在处理支付成功的消息时,偶尔会出现重复处理订单的情况,导致订单重复支付。
  2. 排查过程
    • 检查消费者代码:查看消费者代码,发现消费者在处理消息时没有进行幂等性处理。每次收到支付成功的消息,直接更新订单状态为已支付,没有检查订单是否已经是已支付状态。
    • 查看 RocketMQ Console:在 RocketMQ Console 中查看消费者组的重试次数,发现有部分消息进行了多次重试。这是因为消费者在处理这些消息时抛出了异常,RocketMQ 根据重试策略进行了重试,但由于消费者没有幂等性处理,导致消息重复消费。
  3. 解决方案
    • 添加幂等性处理:在消费者处理消息时,先查询订单状态,如果订单已经是已支付状态,则不再进行重复处理。可以使用数据库的唯一索引或者缓存来实现幂等性判断。
    • 优化异常处理:在消费者代码中,对可能出现的异常进行更细致的处理,避免不必要的重试。例如,对于一些瞬时性的网络异常,可以进行适当的重试,但对于业务逻辑错误导致的异常,应及时记录并停止重试。