MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka的分区与副本机制详解

2022-01-316.6k 阅读

Kafka 分区机制

分区的基本概念

在 Kafka 中,主题(Topic)是一种逻辑上的概念,用于对消息进行分类。而分区(Partition)则是主题的物理细分。每个主题可以被划分为多个分区,每个分区是一个有序的、不可变的消息序列,这些消息会被不断地追加到分区中。

分区存在的意义重大。首先,它提供了并行处理能力。当生产者向 Kafka 集群发送消息时,消息可以并行地写入不同的分区,同样,消费者也可以并行地从不同分区读取消息,这大大提高了 Kafka 系统的数据处理能力和吞吐量。其次,分区有助于数据的分布式存储,将数据分散在多个节点上,避免单个节点存储压力过大。

分区的分配策略

  1. 轮询策略(Round - Robin)
    • 这是 Kafka 生产者默认的分区分配策略之一。当使用轮询策略时,生产者会按照主题中分区的顺序依次将消息发送到各个分区。例如,假设有一个主题 test_topic 有 3 个分区 p0p1p2,生产者发送的第一条消息会被发送到 p0,第二条消息会被发送到 p1,第三条消息会被发送到 p2,第四条消息又会回到 p0,依此类推。
    • 轮询策略的优点是非常简单且能均匀地将消息分配到各个分区,保证了各个分区的数据量相对均衡。但它没有考虑到分区的负载情况,如果某个分区所在的节点性能较差,可能会导致该分区的写入延迟较高。
    • 在 Java 代码中,使用 Kafka 生产者时,可以通过配置 partitioner.classorg.apache.kafka.clients.producer.internals.DefaultPartitioner 来使用轮询策略(这是默认配置)。以下是一个简单的 Java 生产者示例:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        String topicName = "test_topic";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 使用默认的轮询分区策略
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key_" + i, "value_" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}
  1. 随机策略(Random)
    • 随机策略就是生产者随机地将消息发送到主题的各个分区中。它会在每次发送消息时,从主题的所有分区中随机选择一个分区来发送消息。这种策略也能在一定程度上实现消息在分区间的分散,但可能不如轮询策略那样均匀。例如,可能会出现短期内某些分区接收的消息较多,而某些分区接收的消息较少的情况。
    • 在 Java 代码中,如果要使用随机策略,可以自定义一个分区器类来实现随机分配。以下是一个简单的自定义随机分区器示例:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.Random;

public class RandomPartitioner implements Partitioner {
    private final Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }

    @Override
    public void close() {
        // 关闭资源(这里无资源可关)
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 配置(这里无特殊配置)
    }
}

然后在生产者配置中使用这个自定义分区器:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerWithRandomPartitionerExample {
    public static void main(String[] args) {
        String topicName = "test_topic";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 使用自定义的随机分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "RandomPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key_" + i, "value_" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}
  1. 按消息键(Key)策略
    • 当使用按消息键策略时,生产者会根据消息的键(Key)来决定消息发送到哪个分区。具体做法是对消息的键进行哈希计算,然后将哈希值对分区数量取模,得到的结果就是消息要发送到的分区编号。例如,假设有一个主题有 5 个分区,消息键为 "test_key",对 "test_key" 进行哈希计算得到哈希值 hashValuepartition = hashValue % 5,那么该消息就会被发送到编号为 partition 的分区。
    • 这种策略的好处是可以保证具有相同键的消息会被发送到同一个分区,这在一些场景下非常有用,比如需要对具有相同标识的数据进行顺序处理时。例如,在电商订单处理中,如果以订单号作为消息键,那么同一个订单的所有消息(如订单创建、订单支付、订单发货等)都会被发送到同一个分区,消费者从该分区读取消息时就能按顺序处理订单相关的所有操作。
    • 在 Java 生产者中,默认的 DefaultPartitioner 会在消息键不为空时使用按消息键策略。如果要明确使用这种策略,只需确保消息键不为空即可。以下是一个示例:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerWithKeyPartitionerExample {
    public static void main(String[] args) {
        String topicName = "test_topic";
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            // 使用订单号作为消息键
            String orderId = "order_" + (i % 3);
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, orderId, "order_info_" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

分区的存储结构

Kafka 分区的数据存储在磁盘上,每个分区在文件系统中对应一个目录,目录名格式为 topic_name - partition_id。例如,对于主题 test_topic 的分区 0,其对应的目录为 test_topic - 0

在每个分区目录下,又包含多个日志段(Log Segment)文件。日志段文件是 Kafka 数据存储的基本单位,每个日志段文件由两部分组成:日志文件(.log)和索引文件(.index)。日志文件用于存储实际的消息内容,而索引文件则用于加速消息的查找。

  1. 日志文件
    • 日志文件是一个顺序追加的文件,消息会按照顺序不断地写入到日志文件中。每个消息在日志文件中有一个唯一的偏移量(Offset),偏移量是从 0 开始的单调递增的整数。偏移量用于标识消息在分区中的位置,消费者通过偏移量来定位和读取消息。
    • 日志文件的大小是有限制的,当日志文件达到一定大小(可通过配置 log.segment.bytes 来设置,默认值为 1GB)时,就会创建一个新的日志段文件。例如,第一个日志段文件可能名为 00000000000000000000.log,当它达到限制大小后,新的日志段文件会命名为 000000000000001048576.log(假设第一个日志段文件写满了 1GB,1GB 的偏移量为 1048576)。
  2. 索引文件
    • 索引文件是一个稀疏索引,它并不会为日志文件中的每一条消息都创建索引项。索引文件中的每一个索引项包含两个部分:相对偏移量(Relative Offset)和物理位置(Position)。相对偏移量是相对于当前日志段起始偏移量的差值,物理位置则是消息在日志文件中的字节偏移位置。
    • 例如,假设当前日志段的起始偏移量为 1000,消息 M1 的偏移量为 1005,它在日志文件中的物理位置为 500 字节处,那么在索引文件中就会有一个索引项,其相对偏移量为 51005 - 1000),物理位置为 500。当消费者需要查找偏移量为 1005 的消息时,首先会在索引文件中查找相对偏移量为 5 的索引项,得到物理位置 500,然后直接从日志文件的 500 字节处读取消息。这种稀疏索引的方式既减少了索引文件的大小,又能快速定位消息。

Kafka 副本机制

副本的基本概念

Kafka 的副本机制是为了保证数据的可靠性和高可用性。在 Kafka 中,每个分区可以有多个副本(Replica),副本分为领导者副本(Leader Replica)和追随者副本(Follower Replica)。

  1. 领导者副本
    • 每个分区有且仅有一个领导者副本。生产者发送的消息以及消费者读取的消息都直接与领导者副本交互。领导者副本负责处理分区的所有读写请求,它是分区数据的“主副本”。例如,当生产者向某个分区发送消息时,实际上是将消息发送到该分区的领导者副本所在的 Broker 节点上。
  2. 追随者副本
    • 追随者副本不直接处理读写请求,它们的主要任务是从领导者副本中复制数据,保持与领导者副本的数据同步。追随者副本会定期从领导者副本拉取数据,并将拉取到的数据追加到自己的日志文件中。这样,当领导者副本所在的节点出现故障时,追随者副本中的一个可以被选举为新的领导者副本,继续提供服务,从而保证数据的可用性和一致性。
    • 追随者副本的数量可以通过主题创建时的 replication - factor 参数来设置。例如,当 replication - factor 设置为 3 时,除了领导者副本外,该分区还有 2 个追随者副本。

副本的选举机制

  1. 领导者选举
    • 当领导者副本所在的 Broker 节点发生故障时,Kafka 集群需要从追随者副本中选举出一个新的领导者副本。Kafka 使用 Zookeeper 来协助进行领导者选举。
    • 首先,Kafka 集群中的每个 Broker 节点都会在 Zookeeper 上注册一个临时节点,例如 /brokers/ids/<broker_id>。当一个 Broker 节点故障时,其在 Zookeeper 上对应的临时节点会被删除。
    • 对于每个分区,Kafka 会在 Zookeeper 上创建一个持久节点 /brokers/topics/<topic_name>/partitions/<partition_id>/state,该节点存储了分区的副本状态信息,包括领导者副本所在的 Broker 节点 ID 等。当领导者副本所在的 Broker 节点故障时,Zookeeper 上该分区的状态节点中的领导者信息会失效。
    • 追随者副本会定期向 Zookeeper 发送心跳请求,以表明自己的存活状态。当领导者副本故障后,存活的追随者副本中第一个检测到领导者故障(通过 Zookeeper 节点变化通知)的副本会尝试在 Zookeeper 上创建一个临时顺序节点 /controller_epoch/<epoch_number>/leader_and_isr/<topic_name>/<partition_id>,如果创建成功,该副本就成为新的领导者副本,并将自己的 Broker 节点 ID 更新到 Zookeeper 上该分区的状态节点中。其他追随者副本发现新的领导者节点创建成功后,会停止尝试选举,并开始从新的领导者副本同步数据。
  2. ISR(In - Sync Replicas)
    • ISR 是指与领导者副本保持同步的追随者副本集合。只有在 ISR 中的追随者副本才有资格被选举为新的领导者副本。Kafka 会动态维护每个分区的 ISR。
    • 追随者副本通过定期向领导者副本发送 Fetch 请求来拉取数据,领导者副本会记录每个追随者副本的滞后情况。如果一个追随者副本在一定时间内(可通过配置 replica.lag.time.max.ms 来设置,默认值为 10000 毫秒)没有向领导者副本发送 Fetch 请求,或者它落后领导者副本的消息数超过一定阈值(可通过配置 replica.lag.max.messages 来设置,默认值为 4000 条,在 Kafka 0.10.1 及以后版本中此配置已弃用,主要由 replica.lag.time.max.ms 控制),那么该追随者副本会被从 ISR 中移除。
    • 当一个原本落后的追随者副本重新追上领导者副本时,它会被重新加入到 ISR 中。例如,假设一个追随者副本由于网络问题在一段时间内没有从领导者副本拉取数据,被移出了 ISR。当网络恢复后,它开始快速拉取数据,当它与领导者副本的差距在允许范围内时,就会被重新加入 ISR。在选举新的领导者副本时,只会从 ISR 中的追随者副本中进行选举,这保证了新选举出的领导者副本具有相对较新的数据,从而维持数据的一致性。

副本的同步机制

  1. 基于拉取(Pull - based)的同步
    • Kafka 的副本同步采用基于拉取的模式,即追随者副本主动从领导者副本拉取数据。追随者副本通过向领导者副本发送 Fetch 请求来获取数据。Fetch 请求中包含了追随者副本当前的偏移量信息,领导者副本根据这个偏移量,从自己的日志文件中读取相应的数据,并返回给追随者副本。
    • 例如,假设追随者副本当前的偏移量为 100,它向领导者副本发送 Fetch 请求,领导者副本会从偏移量 100 开始读取数据(通常会读取一定量的数据,如配置的 fetch.max.bytes 大小的数据),然后将这些数据返回给追随者副本。追随者副本接收到数据后,会将其追加到自己的日志文件中,并更新自己的偏移量。
  2. 同步的一致性保证
    • Kafka 通过 ISR 和多副本机制来保证数据的一致性。当生产者发送一条消息到领导者副本时,领导者副本会将消息写入自己的日志文件,并等待 ISR 中的所有追随者副本都成功复制该消息后,才会向生产者发送确认响应。
    • 例如,假设有一个分区有 3 个副本(1 个领导者副本和 2 个追随者副本),生产者发送一条消息到领导者副本。领导者副本将消息写入日志文件后,会等待两个追随者副本都成功复制该消息。只有当两个追随者副本都复制成功后,领导者副本才会向生产者发送确认消息,表示消息已成功写入。这样可以确保即使领导者副本发生故障,从 ISR 中选举出的新领导者副本也包含了这条消息,从而保证了数据的一致性。
    • 但是,如果在领导者副本等待追随者副本复制消息的过程中,某个追随者副本一直没有响应,而此时 ISR 中剩下的副本数量小于 min.insync.replicas(可通过主题配置设置,默认值为 1),领导者副本会向生产者返回错误,告知消息写入失败。这是为了防止数据丢失,因为如果在 ISR 副本数量不足的情况下继续确认消息写入成功,当领导者副本故障时,可能会导致部分消息丢失。

副本机制对性能和可靠性的影响

  1. 对性能的影响
    • 副本机制在一定程度上会影响 Kafka 的性能。一方面,由于追随者副本需要从领导者副本拉取数据,这会增加网络带宽的消耗。特别是在副本数量较多或者数据量较大的情况下,网络带宽可能成为瓶颈。例如,如果一个主题有 5 个副本,每个副本都需要从领导者副本拉取大量数据,那么网络带宽的压力会显著增加。
    • 另一方面,领导者副本在等待追随者副本复制消息时,会增加生产者的等待时间,从而影响生产者的写入性能。如果 ISR 中的追随者副本数量较多,并且某些副本由于网络或磁盘 I/O 等问题复制速度较慢,生产者可能需要等待较长时间才能得到确认响应。
    • 然而,通过合理配置副本数量和优化网络、磁盘等基础设施,可以在一定程度上缓解性能问题。例如,根据实际的网络带宽和服务器性能,设置合适的副本数量,避免过多副本导致的性能开销。同时,使用高速网络和高性能磁盘,也可以提高副本同步的速度,减少对生产者和消费者性能的影响。
  2. 对可靠性的影响
    • 副本机制大大提高了 Kafka 的数据可靠性。通过多副本存储,即使某个 Broker 节点发生故障,只要 ISR 中有足够的副本存活,数据就不会丢失。例如,当一个包含多个副本的分区的领导者副本所在节点故障时,Kafka 可以从 ISR 中的追随者副本中快速选举出一个新的领导者副本,继续提供服务,并且新领导者副本中的数据与原领导者副本基本一致,保证了数据的连续性和完整性。
    • 同时,Kafka 的副本同步机制和 ISR 动态维护机制,确保了数据在副本之间的一致性。只有在 ISR 中的副本都确认复制成功后,才会向生产者确认消息写入,这有效防止了数据丢失和不一致的情况发生。不过,为了进一步提高可靠性,还可以合理设置 min.insync.replicas 等参数,根据实际需求调整对数据可靠性的要求。例如,在对数据可靠性要求极高的场景下,可以将 min.insync.replicas 设置为大于 1 的值,这样即使有个别追随者副本故障,只要 ISR 中剩余副本数量满足 min.insync.replicas 的要求,数据依然能够保证一致性和可靠性。

通过深入理解 Kafka 的分区与副本机制,开发者可以更好地利用 Kafka 的特性,构建高可用、高性能且数据可靠的分布式消息系统。在实际应用中,需要根据具体的业务场景和需求,合理配置分区和副本相关的参数,以达到最佳的系统性能和可靠性。