MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 消息存储机制全解

2023-10-103.6k 阅读

Kafka 基础架构简介

在深入探讨 Kafka 消息存储机制之前,先来简要了解一下 Kafka 的基础架构。Kafka 是一个分布式流处理平台,主要由以下几个核心组件构成:

  1. Producer(生产者):负责将消息发送到 Kafka 集群。生产者可以是各种应用程序,比如日志收集程序、业务系统中的事件产生模块等。它根据一定的分区策略,将消息发送到特定的 Topic 的指定分区中。
  2. Consumer(消费者):从 Kafka 集群中读取消息进行处理。消费者可以组成消费者组,同一组内的消费者共同消费一个 Topic 的消息,不同组的消费者可以独立消费同一个 Topic,互不影响。
  3. Broker(代理):Kafka 集群中的每一个节点就是一个 Broker。它负责接收生产者发送的消息,存储消息,并为消费者提供消息拉取服务。多个 Broker 共同组成 Kafka 集群,以实现高可用和水平扩展。
  4. Topic(主题):Kafka 中的逻辑概念,用于对消息进行分类。一个 Topic 可以被认为是一类消息的集合,比如“user - activity”主题可以用来存储所有与用户活动相关的消息。
  5. Partition(分区):每个 Topic 可以进一步划分为多个分区。分区是 Kafka 实现分布式存储和并行处理的基础。消息在分区内是有序的,不同分区之间的消息顺序不能保证。

Kafka 消息存储概述

Kafka 的消息存储机制是其高性能、高可靠性的关键所在。Kafka 将消息以追加写(append - only)的方式写入到磁盘上的日志文件中。这种方式避免了随机写操作,大大提高了写入性能。每个 Topic 的每个分区都对应一个物理的日志文件,消息按照顺序不断追加到这个日志文件中。

Kafka 使用了一种分段日志(Segmented Log)的设计,将一个分区的日志文件按照一定的规则切分成多个日志段(Log Segment)。每个日志段包含一定数量的消息,当一个日志段达到一定大小或者经过一定时间后,就会关闭当前日志段并创建一个新的日志段。这种分段设计有助于提高消息的读写效率,特别是在查找和删除旧消息时。

消息格式

Kafka 的消息格式经历了多个版本的演进,当前最新的消息格式是版本 2(在 Kafka 2.4.0 及更高版本中使用)。消息格式的设计对 Kafka 的性能和功能有着重要影响。

  1. 消息结构
    • RecordBatch:Kafka 以 RecordBatch 的形式存储和传输消息,这样可以提高存储和网络传输的效率。一个 RecordBatch 包含多个消息,同时还包含一些元数据,如批次的 CRC 校验和、时间戳等。
    • Record:单个消息的结构,包含消息的 key、value、时间戳等信息。在 Kafka 中,key 用于决定消息被发送到哪个分区(如果使用默认的分区策略),value 则是消息的实际内容。
  2. 版本 2 消息格式示例 以下是一个简化的示例,展示版本 2 消息格式在字节层面的大致结构(以 Java 代码简单模拟):
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;

import java.nio.ByteBuffer;

public class KafkaMessageFormatExample {
    public static void main(String[] args) {
        ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(1024);
        // 构建 RecordBatch
        RecordBatch.Builder batchBuilder = RecordBatch.builder(bufferOutputStream, RecordBatch.MAGIC_VALUE_V2,
                TimestampType.CREATE_TIME, 1000, 0, 0, 0, 0, 0, 0, 0);
        // 构建单个 Record
        byte[] key = "exampleKey".getBytes();
        byte[] value = "exampleValue".getBytes();
        batchBuilder.append(0, 0, TimestampType.CREATE_TIME, 0, key.length, key, value.length, value);
        RecordBatch batch = batchBuilder.build();
        ByteBuffer buffer = bufferOutputStream.getByteBuffer();
        // 这里 buffer 包含了按照版本 2 格式构建的 RecordBatch
        System.out.println("Message in version 2 format: " + buffer);
    }
}

在这个示例中,首先创建了一个 RecordBatch.Builder 来构建 RecordBatch,然后使用 append 方法添加单个 Record。最后构建出的 RecordBatch 被写入到 ByteBuffer 中,这个 ByteBuffer 中的内容就是按照版本 2 消息格式组织的。

日志段(Log Segment)

  1. 日志段的组成
    • 每个日志段由两个文件组成:一个是日志文件(以.log 为后缀),用于存储实际的消息内容;另一个是索引文件(以.index 为后缀),用于加速消息的查找。
    • 日志文件:消息以追加的方式顺序写入日志文件。日志文件中的每个消息都有一个偏移量(Offset),它是消息在分区中的唯一标识,从 0 开始单调递增。偏移量在 Kafka 中起着至关重要的作用,它不仅用于定位消息,还用于消费者记录自己消费到的位置。
    • 索引文件:索引文件采用稀疏索引的方式,记录了部分消息的偏移量和在日志文件中的物理位置。例如,索引文件中可能每隔一定数量的消息记录一条索引项,这样在查找消息时,可以先通过索引文件快速定位到大致的位置范围,然后再在日志文件中进行精确查找。
  2. 日志段的管理
    • 日志段的滚动(Rolling):当满足一定条件时,Kafka 会进行日志段的滚动,即关闭当前日志段并创建一个新的日志段。滚动的条件主要有两个:一是日志段大小达到配置的 log.segment.bytes 参数值(默认是 1GB);二是日志段的存活时间达到配置的 log.roll.hours 参数值(默认是 168 小时,即一周)。
    • 日志段的删除:Kafka 支持两种日志清理策略:删除(Delete)和压缩(Compact)。在删除策略下,当日志段中的消息达到一定的保留时间(由 log.retention.hours 等参数配置,默认 168 小时)或者日志段大小超过一定限制时,Kafka 会删除这些旧的日志段。在压缩策略下,Kafka 会保留每个 key 的最新消息,删除旧的消息版本,以减少存储空间的占用,适用于一些需要保留最新状态的场景,如存储用户的最新资料等。

索引机制

  1. 偏移量索引(Offset Index)
    • 偏移量索引是 Kafka 中最主要的索引类型,存储在.index 文件中。它的作用是根据消息的偏移量快速定位消息在日志文件中的物理位置。
    • 偏移量索引文件中的每一项包含两个 4 字节的整数,第一个整数表示消息的偏移量,第二个整数表示该消息在日志文件中的物理位置(相对于日志文件起始位置的字节偏移)。由于采用稀疏索引,并不是每个消息都有对应的索引项,而是每隔一定数量的消息(由 log.index.interval.bytes 参数配置,默认 4096 字节)记录一条索引项。
  2. 时间戳索引(Timestamp Index)
    • 时间戳索引是 Kafka 0.10.0.0 版本引入的,存储在.timeindex 文件中。它允许根据时间戳来查找消息,这在一些需要按照时间范围检索消息的场景中非常有用。
    • 时间戳索引文件的结构与偏移量索引类似,每一项包含两个 8 字节的长整数,第一个长整数表示消息的时间戳,第二个长整数表示对应的消息偏移量。同样,时间戳索引也是稀疏索引,通过 log.message.timestamp.difference.max.ms 参数配置索引的间隔,默认是 60000 毫秒(1 分钟)。
  3. 索引查找示例 以下是一个简单的示例,展示如何根据偏移量通过索引查找消息在日志文件中的位置(以 Python 代码简单模拟,实际 Kafka 内部使用更复杂的机制):
def find_message_position(offset, index_file_path):
    index_entries = []
    with open(index_file_path, 'rb') as index_file:
        while True:
            data = index_file.read(8)
            if not data:
                break
            index_offset = int.from_bytes(data[:4], byteorder='big')
            physical_offset = int.from_bytes(data[4:], byteorder='big')
            index_entries.append((index_offset, physical_offset))
    for i in range(len(index_entries) - 1):
        if index_entries[i][0] <= offset <= index_entries[i + 1][0]:
            # 这里简单假设线性查找,实际 Kafka 有更高效算法
            log_file_offset = index_entries[i][1]
            # 这里可以进一步根据偏移量在日志文件中读取消息
            return log_file_offset
    return None

在这个示例中,首先从索引文件中读取所有的索引项,然后通过比较偏移量来确定消息大致所在的位置范围,进而得到消息在日志文件中的物理偏移量。

副本机制

  1. 副本的概念
    • Kafka 通过副本机制来保证数据的高可用性和容错性。每个分区都可以有多个副本,其中一个副本被指定为领导者(Leader)副本,其余副本为追随者(Follower)副本。
    • 生产者发送消息时,消息首先被发送到领导者副本,然后领导者副本将消息同步给追随者副本。消费者从领导者副本读取消息。这样,当领导者副本所在的 Broker 发生故障时,Kafka 可以从追随者副本中选举出新的领导者副本,继续提供服务,保证数据不丢失。
  2. 副本同步流程
    • 领导者副本接收消息:生产者发送的消息到达领导者副本后,领导者副本将消息追加到本地日志文件,并更新其高水位(High Watermark,简称 HW)。高水位表示已成功同步到所有同步副本(In - Sync Replicas,简称 ISR)的消息偏移量。
    • 追随者副本同步消息:追随者副本定期从领导者副本拉取消息,将拉取到的消息追加到自己的本地日志文件,并更新自己的高水位。如果追随者副本在一定时间内(由 replica.lag.time.max.ms 参数配置,默认 10000 毫秒)没有向领导者副本发送拉取请求或者落后领导者副本太多消息(由 replica.lag.max.messages 参数配置,默认 4000 条),则该追随者副本会被从 ISR 中移除。
    • 故障处理:当领导者副本所在的 Broker 发生故障时,Kafka 会从 ISR 中的追随者副本中选举出新的领导者副本。新的领导者副本会继续接收生产者发送的消息,并为消费者提供服务。同时,Kafka 会尝试将之前未同步的消息同步到新的追随者副本,以保证数据的一致性。

写入性能优化

  1. 顺序写磁盘
    • Kafka 采用追加写的方式将消息写入日志文件,这种顺序写磁盘的方式比随机写磁盘具有更高的性能。在传统的文件系统中,随机写操作需要频繁的磁盘寻道,而顺序写则可以充分利用磁盘的顺序读写特性,减少寻道时间,提高 I/O 性能。
    • Kafka 通过将消息先写入操作系统的页缓存(Page Cache),然后由操作系统异步将页缓存中的数据刷写到磁盘上,进一步提高了写入性能。这样,生产者发送消息时,只需要将消息写入页缓存即可返回,而不需要等待消息真正持久化到磁盘,大大减少了消息发送的延迟。
  2. 批量处理
    • Kafka 支持生产者批量发送消息,以及 Broker 批量处理消息。生产者可以通过 batch.size 参数配置每个批次的大小(默认 16384 字节),当批次大小达到这个值或者经过 linger.ms 参数配置的时间(默认 0 毫秒)后,生产者将批次消息发送到 Kafka 集群。
    • Broker 在处理消息时,也会批量处理从生产者接收到的消息,然后批量写入日志文件。这种批量处理的方式减少了系统调用的次数,提高了整体的处理效率。

读取性能优化

  1. 页缓存与零拷贝
    • 页缓存:当消费者从 Kafka 读取消息时,Kafka 首先从操作系统的页缓存中查找消息。由于 Kafka 消息写入时先写入页缓存,所以大部分情况下,消费者可以直接从页缓存中读取到所需的消息,避免了磁盘 I/O 操作,大大提高了读取性能。
    • 零拷贝:Kafka 在向消费者传输消息时,采用了零拷贝技术。传统的文件读取和网络发送需要多次数据拷贝,而零拷贝技术通过直接将页缓存中的数据发送到网络套接字,减少了数据拷贝的次数,提高了数据传输的效率。在 Linux 系统中,Kafka 使用 sendfile 系统调用实现零拷贝,将文件数据直接从内核空间传输到网络套接字,避免了用户空间和内核空间之间的数据拷贝。
  2. 分段存储与索引
    • 前面提到的日志段分段存储和索引机制对读取性能有很大的提升。通过索引文件,Kafka 可以快速定位到消息所在的日志段和大致位置,然后在日志段内进行精确查找。这种机制避免了从整个日志文件中顺序查找消息,大大减少了查找时间,提高了读取性能。特别是在处理大规模消息存储时,分段存储和索引的优势更加明显。

Kafka 消息存储与其他系统的对比

  1. 与传统关系型数据库对比
    • 存储结构:传统关系型数据库采用结构化的表结构存储数据,数据存储在多个表中,通过主键和外键建立关联关系。而 Kafka 以日志文件的形式存储消息,消息以追加的方式顺序写入,不强调数据的结构化存储,更适合处理流式数据。
    • 读写性能:关系型数据库在随机读写少量数据时性能较好,但在处理大量顺序写入和顺序读取的场景下,由于其存储结构和索引机制的限制,性能不如 Kafka。Kafka 的顺序写磁盘和零拷贝等技术使其在高吞吐量的读写场景下表现出色。
    • 数据一致性:关系型数据库通过事务机制保证数据的一致性,在分布式环境下实现强一致性较为复杂。Kafka 通过副本机制保证数据的高可用性和一定程度的一致性,在分区内可以保证消息的顺序一致性,但不同分区之间的消息顺序无法保证。
  2. 与其他消息队列对比
    • RabbitMQ:RabbitMQ 更侧重于传统的消息队列场景,如任务队列、异步处理等。它支持多种消息传递模式,如点对点、发布 - 订阅等。在存储机制上,RabbitMQ 可以将消息存储在内存或者磁盘上,而 Kafka 主要依赖磁盘存储,更适合处理海量消息。在性能方面,Kafka 在高吞吐量的场景下性能优于 RabbitMQ,但 RabbitMQ 在低延迟和灵活性方面有一定优势。
    • RocketMQ:RocketMQ 也是一款高性能的分布式消息队列,在存储机制上与 Kafka 有一些相似之处,都采用顺序写磁盘和分段日志的方式。不过,RocketMQ 在事务消息支持等方面有更深入的功能,而 Kafka 在流处理和大数据集成方面有更多的生态支持。

总结 Kafka 消息存储机制的特点

  1. 高性能:通过顺序写磁盘、批量处理、页缓存和零拷贝等技术,Kafka 在消息的写入和读取方面都具有极高的性能,能够处理大规模的消息流。
  2. 高可靠性:副本机制保证了数据的高可用性和容错性,即使部分 Broker 发生故障,也能保证数据不丢失,继续提供服务。
  3. 可扩展性:Kafka 的分布式架构允许通过增加 Broker 节点来实现水平扩展,以适应不断增长的消息处理需求。
  4. 灵活性:支持多种消息清理策略和索引机制,能够满足不同场景下对消息存储和查询的需求。

深入理解 Kafka 的消息存储机制,对于在实际应用中合理配置和使用 Kafka,充分发挥其性能和功能优势具有重要意义。无论是构建大规模的分布式系统,还是处理实时流数据,掌握 Kafka 的消息存储原理都是关键的一步。