Kafka 开发中如何保障集群的高可用性与容错性
2022-10-117.5k 阅读
Kafka 集群架构概述
Kafka 作为一款分布式流处理平台,其集群架构对于理解高可用性与容错性至关重要。Kafka 集群由多个 Broker 节点组成,每个 Broker 负责管理部分分区(Partition)的数据。
-
Broker 节点
- Broker 是 Kafka 集群的核心组件,它接收生产者发送的消息,为消费者提供拉取消息的服务。每个 Broker 有一个唯一的标识符,通常是一个整数。多个 Broker 共同组成 Kafka 集群,它们之间通过 ZooKeeper 进行协调。
- 例如,在一个简单的 Kafka 集群部署中,可能有三个 Broker 节点,其标识符分别为 0、1、2。这些 Broker 节点共同承担着存储和传输消息的任务。
-
主题(Topic)与分区(Partition)
- 主题是 Kafka 中消息的逻辑分类,类似于数据库中的表。每个主题可以划分为多个分区,分区是 Kafka 并行处理消息的基本单位。分区的数据是有序的,而主题内不同分区之间的消息顺序不保证。
- 以一个电商订单消息主题为例,可以根据订单 ID 的哈希值将订单消息分配到不同的分区中。假设主题
order_topic
有 5 个分区,订单 ID 为 123 的消息可能会根据哈希计算被分配到分区 2 中。
-
副本(Replica)
- 为了实现高可用性,Kafka 为每个分区创建多个副本。这些副本分布在不同的 Broker 节点上。其中一个副本被指定为领导者(Leader)副本,负责处理该分区的读写请求,其他副本为追随者(Follower)副本。
- 例如,对于分区
order_topic - 2
,在 Broker 0 上的副本是领导者副本,而在 Broker 1 和 Broker 2 上的副本是追随者副本。追随者副本会定期从领导者副本同步数据,以保持数据的一致性。
-
ZooKeeper 的作用
- ZooKeeper 在 Kafka 集群中扮演着至关重要的角色。它负责管理集群的元数据,包括 Broker 节点的注册与发现、主题与分区的元数据管理、领导者选举等。
- 当一个新的 Broker 节点加入集群时,它会向 ZooKeeper 注册自己的信息。ZooKeeper 会通知其他 Broker 节点关于新节点的加入,以便集群重新进行负载均衡和元数据更新。
高可用性保障机制
- 副本机制
- 领导者选举
- 当领导者副本所在的 Broker 节点发生故障时,Kafka 需要从追随者副本中选举出一个新的领导者。ZooKeeper 会检测到领导者副本所在 Broker 的故障,并触发领导者选举过程。
- Kafka 使用一种基于 ISR(In - Sync Replicas,同步副本集)的选举机制。ISR 是指与领导者副本保持一定程度同步的追随者副本集合。只有在 ISR 中的副本才有资格被选举为新的领导者。
- 例如,假设分区
order_topic - 2
的领导者副本在 Broker 0 上,当 Broker 0 发生故障时,ZooKeeper 会在该分区的 ISR 中(假设 ISR 包含 Broker 1 和 Broker 2 上的副本)选举一个新的领导者。如果 Broker 1 上的副本的 LEO(Log End Offset,日志末尾偏移量)最大,那么 Broker 1 上的副本将被选举为新的领导者。
- 数据同步
- 追随者副本通过向领导者副本发送 FETCH 请求来同步数据。领导者副本会将消息的偏移量和数据发送给追随者副本。
- Kafka 使用一种基于拉取(Pull)的同步模型,追随者副本主动从领导者副本拉取数据,这样可以更好地适应不同的网络环境和副本负载。
- 代码示例(使用 Java 和 Kafka 客户端 API 展示如何创建一个简单的生产者向主题发送消息,以及消费者从主题拉取消息,间接体现副本的数据同步过程):
- 领导者选举
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaExample {
public static void main(String[] args) {
// 生产者配置
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(producerProps);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "value1");
try {
producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
producer.close();
// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(java.util.Collections.singletonList("test_topic"));
// 拉取消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record1 : records) {
System.out.println("Received message: " + record1.value());
}
}
}
}
-
ISR 动态管理
- ISR 中的副本是与领导者副本保持同步的追随者副本。Kafka 会动态地维护 ISR 的成员列表。如果一个追随者副本长时间没有从领导者副本同步数据,它会被从 ISR 中移除。
- 同步的判定标准基于副本的 LEO 和领导者副本的 LEO 之间的差距。如果差距超过一定阈值,该追随者副本就被认为是不同步的。
- 例如,假设领导者副本的 LEO 为 1000,而某个追随者副本的 LEO 为 900,且这个差距持续超过了配置的阈值(如 100 个消息偏移量),那么这个追随者副本将被从 ISR 中移除。当该追随者副本重新追上领导者副本时,它又会被重新加入到 ISR 中。
-
Broker 节点故障处理
- 当一个 Broker 节点发生故障时,Kafka 集群会自动进行一系列的恢复操作。ZooKeeper 会检测到 Broker 的故障,并通知其他 Broker 节点。
- 对于故障 Broker 上的分区副本,如果该副本是领导者副本,会触发领导者选举,从 ISR 中的追随者副本中选出新的领导者。对于追随者副本,其他 Broker 上的追随者副本会继续从新的领导者副本同步数据。
- 例如,假设 Broker 1 发生故障,该 Broker 上有分区
test_topic - 3
的领导者副本。ZooKeeper 会通知集群,然后在该分区的 ISR 中选举一个新的领导者,比如 Broker 2 上的副本。其他追随者副本(如 Broker 3 上的副本)会开始从 Broker 2 上的新领导者副本同步数据。
容错性保障机制
- 数据持久性
- 日志存储
- Kafka 使用日志文件来存储消息。每个分区的消息被追加写入到对应的日志文件中。日志文件以段(Segment)为单位进行管理,每个段包含一定数量的消息。
- 当一个段的大小达到配置的阈值(如 1GB)或者经过一定时间(如 1 天),会创建一个新的段。这样可以方便地管理和清理日志文件。
- 例如,分区
test_topic - 1
的日志文件可能命名为test_topic - 1 - 0.log
,test_topic - 1 - 1.log
等。当test_topic - 1 - 0.log
达到 1GB 时,会创建test_topic - 1 - 1.log
继续存储消息。
- 日志清理策略
- Kafka 支持两种主要的日志清理策略:删除(Delete)和压缩(Compact)。
- 删除策略是根据配置的保留时间(如 7 天)或保留大小(如 10GB)删除旧的日志段。如果设置保留时间为 7 天,那么 7 天前的日志段会被删除。
- 压缩策略主要用于需要保留最新值的场景。它会根据消息的键对日志进行压缩,只保留每个键的最新值。例如,在用户信息更新的场景中,只保留每个用户的最新信息。
- 日志存储
- 网络故障处理
- 生产者重试机制
- 当生产者向 Kafka 集群发送消息时,如果遇到网络故障导致消息发送失败,生产者会根据配置进行重试。生产者可以配置重试次数(如 3 次)和重试间隔时间(如 100 毫秒)。
- 例如,以下是使用 Java Kafka 客户端配置生产者重试机制的代码片段:
- 生产者重试机制
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 3);
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
Producer<String, String> producer = new KafkaProducer<>(producerProps);
- 消费者重新平衡
- 当消费者所在的节点发生网络故障或者新的消费者加入/离开消费组时,会触发消费者重新平衡。Kafka 会重新分配分区给各个消费者。
- 例如,在一个包含三个消费者的消费组中,当其中一个消费者因为网络故障断开连接时,Kafka 会将该消费者之前负责的分区重新分配给剩下的两个消费者。
- 负载均衡
- 分区分配策略
- Kafka 有多种分区分配策略,如 Range 策略、Round - Robin 策略和 Sticky 策略。
- Range 策略是按照主题的分区顺序依次分配给消费者。例如,对于主题
test_topic
有 6 个分区,3 个消费者,Range 策略会将分区 0、1 分配给第一个消费者,分区 2、3 分配给第二个消费者,分区 4、5 分配给第三个消费者。 - Round - Robin 策略是将所有主题的分区轮询分配给消费者。这种策略在多个主题且消费者数量较少的情况下可以更好地实现负载均衡。
- Sticky 策略是在重新平衡时尽量保持已有的分区分配,只有在必要时才进行调整,以减少数据重新分配带来的开销。
- Broker 负载均衡
- Kafka 集群会自动进行 Broker 之间的负载均衡。当新的 Broker 节点加入集群或者某个 Broker 节点负载过高时,Kafka 会通过重新分配分区来平衡负载。
- ZooKeeper 会协助 Kafka 监控 Broker 节点的负载情况,并触发分区的重新分配。例如,如果 Broker 0 的磁盘使用率过高,Kafka 会将部分分区从 Broker 0 迁移到其他负载较低的 Broker 节点上。
- 分区分配策略
配置优化与监控
- Kafka 配置参数优化
- 副本相关配置
- replica.lag.time.max.ms:这个参数定义了追随者副本与领导者副本之间允许的最大同步延迟时间。如果一个追随者副本在这个时间内没有从领导者副本同步数据,它将被从 ISR 中移除。默认值为 10000 毫秒,根据实际网络环境和数据量,可以适当调整这个值。例如,在网络不稳定的环境中,可以适当增大这个值,避免不必要的副本移除。
- num.replica.fetchers:指定每个追随者副本从领导者副本拉取数据的线程数。默认值为 1,对于数据量较大的集群,可以适当增加这个值,以提高同步效率。比如设置为 3,可加快数据同步速度,但也会增加 Broker 的资源消耗。
- 生产者配置
- acks:该参数定义了生产者在收到来自 Broker 的确认之前需要等待的副本数量。取值有 0、1 和 - 1(all)。取值为 0 时,生产者不会等待 Broker 的确认,消息发送速度最快,但可能会丢失消息;取值为 1 时,生产者会等待领导者副本的确认,在领导者副本成功写入消息后就认为发送成功,这种情况下可能会在领导者副本故障且未同步到追随者副本时丢失消息;取值为 - 1(all)时,生产者会等待所有 ISR 中的副本确认,这种方式可以保证消息的持久性,但会降低发送性能。根据应用对消息可靠性和性能的要求来选择合适的值,对于金融交易等对数据可靠性要求极高的场景,通常设置为 - 1。
- linger.ms:生产者会将消息在内存中缓存一段时间(以毫秒为单位),等待更多的消息一起发送,以提高网络传输效率。默认值为 0,即消息立即发送。如果应用场景允许一定的延迟,可以适当增大这个值,比如设置为 100 毫秒,这样可以减少网络请求次数,提高整体吞吐量。
- 消费者配置
- fetch.min.bytes:消费者从 Broker 拉取数据时,要求 Broker 返回的最小数据量。默认值为 1,增大这个值可以减少网络请求次数,但可能会增加消费者的等待时间。例如,设置为 1024(1KB),当 Broker 上该分区的数据量不足 1KB 时,消费者会等待直到数据量达到 1KB 或者等待时间超过
fetch.max.wait.ms
。 - max.poll.records:指定每次
poll
方法调用时返回的最大记录数。默认值为 500,根据消费者的处理能力来调整这个值。如果消费者处理消息速度较快,可以适当增大这个值,提高消费效率;如果处理速度较慢,设置过大可能导致处理不及时,消息积压。
- fetch.min.bytes:消费者从 Broker 拉取数据时,要求 Broker 返回的最小数据量。默认值为 1,增大这个值可以减少网络请求次数,但可能会增加消费者的等待时间。例如,设置为 1024(1KB),当 Broker 上该分区的数据量不足 1KB 时,消费者会等待直到数据量达到 1KB 或者等待时间超过
- 副本相关配置
- 监控指标与工具
- Kafka 自带监控指标
- Broker 指标:
- kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:表示每秒进入 Broker 的消息数量,通过监控这个指标可以了解集群的整体负载情况。如果该指标持续过高,可能需要考虑增加 Broker 节点。
- kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions:显示未完全同步的分区数量。正常情况下这个值应该为 0,如果出现非零值,说明集群中存在副本同步问题,需要及时排查。
- 生产者指标:
- kafka.producer:type=producer - metrics,client - id=xxx,name=record - send - rate:表示生产者每秒发送的记录数,反映了生产者的发送速率。如果这个指标突然下降,可能是生产者遇到了网络问题或者配置问题。
- kafka.producer:type=producer - metrics,client - id=xxx,name=request - latency - avg:平均请求延迟,即生产者发送请求到收到响应的平均时间。如果这个值过高,可能是网络延迟或者 Broker 负载过高导致的。
- 消费者指标:
- kafka.consumer:type=consumer - fetcher - manager - metrics,client - id=xxx,name=records - fetched - rate:表示消费者每秒拉取的记录数,监控这个指标可以了解消费者的消费速度。如果该指标过低,可能是消费者处理能力不足或者分区分配不合理。
- kafka.consumer:type=consumer - coordinator - metrics,client - id=xxx,name=join - time - avg:平均加入消费组时间,反映了消费者加入消费组的延迟情况。如果这个值过高,可能是消费组重新平衡过程中出现了问题。
- Broker 指标:
- 监控工具
- Kafka Manager:它是一个开源的 Kafka 集群管理工具,可以直观地查看 Kafka 集群的状态,包括 Broker 节点信息、主题与分区详情、副本状态等。通过它可以方便地创建、删除主题,调整分区数量等操作。
- Prometheus + Grafana:Prometheus 可以收集 Kafka 的各种指标数据,Grafana 则用于将这些数据可视化展示。可以创建各种自定义的仪表盘,实时监控 Kafka 集群的性能和健康状况。例如,可以创建一个仪表盘同时展示 Broker 的负载、生产者的发送速率和消费者的消费速率等指标,以便及时发现问题。
- Kafka 自带监控指标
常见问题及解决方案
- 副本同步延迟
- 原因分析
- 网络问题:追随者副本所在 Broker 与领导者副本所在 Broker 之间的网络延迟过高或者网络不稳定,导致数据同步缓慢。例如,网络带宽不足,大量的网络丢包等情况。
- 磁盘 I/O 性能:如果 Broker 节点的磁盘 I/O 性能较差,写入日志文件的速度慢,会影响副本同步。比如磁盘老化、磁盘空间不足等原因。
- Broker 负载过高:当 Broker 节点处理过多的请求,CPU 或内存资源紧张时,会导致副本同步延迟。例如,同时有大量的生产者和消费者请求,超出了 Broker 的处理能力。
- 解决方案
- 优化网络:检查网络配置,确保 Broker 节点之间有足够的带宽,减少网络延迟和丢包。可以使用网络测试工具(如 ping、traceroute 等)排查网络问题,必要时升级网络设备或调整网络拓扑。
- 优化磁盘 I/O:检查磁盘使用情况,清理不必要的文件,确保磁盘有足够的空间。如果磁盘性能确实低下,可以考虑更换高性能磁盘(如 SSD)。
- 调整 Broker 负载:通过监控指标确定 Broker 负载过高的原因。如果是因为请求过多,可以适当增加 Broker 节点分担负载;或者调整生产者和消费者的配置,如减小生产者的发送速率、调整消费者的并发度等。
- 原因分析
- 数据丢失问题
- 原因分析
- 生产者配置不当:如
acks
设置为 0 或者 1,且在领导者副本还未将消息同步到追随者副本时发生故障,可能导致消息丢失。另外,生产者重试次数设置过少,在遇到网络故障等问题时,消息发送失败后没有足够的重试次数。 - 副本同步异常:ISR 中副本数量过少,当领导者副本故障时,没有足够的同步副本可以选举为新的领导者,可能导致部分消息丢失。或者 ISR 动态管理出现问题,错误地将正常的副本从 ISR 中移除。
- 生产者配置不当:如
- 解决方案
- 正确配置生产者:根据应用对数据可靠性的要求,合理设置
acks
参数,对于高可靠性要求的场景,设置为 - 1。同时,适当增加重试次数和重试间隔时间,确保消息能够成功发送。 - 优化副本管理:确保 ISR 中有足够数量的副本,并且合理配置与 ISR 相关的参数,如
replica.lag.time.max.ms
等,避免正常副本被误移除。定期检查副本同步状态,及时发现并解决副本同步异常问题。
- 正确配置生产者:根据应用对数据可靠性的要求,合理设置
- 原因分析
- 消息积压
- 原因分析
- 消费者处理能力不足:消费者的业务逻辑复杂,处理单个消息的时间过长,导致消费速度跟不上生产速度。例如,消费者需要进行大量的数据库查询、复杂的计算等操作。
- 分区分配不合理:如果消费者数量与分区数量不匹配,或者分区分配策略不合适,可能导致部分消费者负载过重,而部分消费者空闲,从而造成消息积压。
- 解决方案
- 提升消费者处理能力:优化消费者的业务逻辑,减少不必要的操作,提高单个消息的处理速度。可以考虑使用多线程或异步处理的方式,提高整体消费效率。例如,将数据库查询操作进行异步化,减少主线程的等待时间。
- 合理分配分区:根据消费者数量和处理能力,合理设置主题的分区数量。选择合适的分区分配策略,如在消费者数量较少且多个主题的情况下,使用 Round - Robin 策略可能更有利于负载均衡。定期监控消费者的负载情况,必要时手动调整分区分配。
- 原因分析