MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 架构日志清理策略与影响

2023-12-044.7k 阅读

Kafka 日志清理基础概念

在 Kafka 中,日志清理是维持集群高效运行以及合理利用存储资源的关键机制。Kafka 以日志的形式持久化存储消息,每个分区对应一个物理日志。日志由多个日志段(Log Segment)组成,每个日志段又包含多个消息记录。

日志段

日志段是 Kafka 日志管理的基本单位。每个日志段由一个 .log 文件(存储消息内容)和一个 .index 文件(存储消息偏移量索引)组成。当一个日志段的大小达到配置的 log.segment.bytes 阈值(默认 1GB),或者距离上一个日志段创建时间达到 log.roll.ms(默认 7 天),就会创建一个新的日志段。例如,假设当前日志段 00000000000000000000.log 达到了 log.segment.bytes 大小,Kafka 就会创建新的日志段 00000000000000000001.log

偏移量

偏移量(Offset)是 Kafka 中消息在日志中的唯一标识。它是一个 64 位的整数,从 0 开始单调递增。消费者通过偏移量来记录自己消费到的位置。比如,消费者当前消费到偏移量为 100 的消息,下次就会从偏移量 101 开始消费。

Kafka 日志清理策略类型

Kafka 提供了两种主要的日志清理策略:基于时间的清理策略(Log Retention Policy based on Time)和基于大小的清理策略(Log Retention Policy based on Size)。

基于时间的清理策略

这种策略通过设置 log.retention.hours(默认 168 小时,即 7 天)、log.retention.minuteslog.retention.ms 来控制日志的保留时间。一旦消息的保留时间超过设定值,Kafka 就会将其所在的日志段标记为可删除。例如,若设置 log.retention.hours = 24,那么 24 小时前写入的日志段在满足其他条件(如没有被任何消费者使用)时,就会被清理。

配置示例:

log.retention.hours=24

基于大小的清理策略

基于大小的策略通过设置 log.retention.bytes 来控制日志的总大小。当分区日志的大小超过这个阈值时,Kafka 会从最早的日志段开始删除,直到日志大小低于阈值。例如,设置 log.retention.bytes = 1073741824(即 1GB),当分区日志大小超过 1GB 时,就会触发清理。

配置示例:

log.retention.bytes=1073741824

日志清理策略的实现原理

清理线程

Kafka 中有专门的日志清理线程负责执行日志清理任务。这些线程定期检查各个分区的日志,根据配置的清理策略判断哪些日志段可以被删除。清理线程在 KafkaLogManager 类中进行管理,它会遍历所有的分区日志,并调用 KafkaLog 类中的清理方法。

日志段删除

当一个日志段被标记为可删除时,清理线程会首先关闭该日志段对应的文件句柄,然后将其从文件系统中删除。同时,相关的索引文件(.index)也会被一并删除。例如,在删除 00000000000000000000.log 日志段时,00000000000000000000.index 也会被删除。

日志清理策略对 Kafka 性能的影响

对读写性能的影响

  • 读性能:在日志清理过程中,如果正在读取的日志段被标记为可删除,可能会导致短暂的读取中断。因为清理线程在删除日志段时会先关闭文件句柄,此时若有消费者尝试读取该日志段,就会失败。不过,Kafka 会尽量保证清理操作不会影响到活跃的消费者,通过维护活跃消费者的偏移量信息,确保清理操作不会删除正在被消费的日志段。
  • 写性能:日志清理过程可能会占用一定的磁盘 I/O 资源,从而对写入性能产生影响。尤其是在大规模删除日志段时,磁盘 I/O 压力会增大。为了缓解这种影响,Kafka 采用了异步清理的方式,即清理操作在后台线程中执行,尽量不影响主线程的写入操作。

对存储资源的影响

合理的日志清理策略可以有效控制 Kafka 集群的存储资源占用。基于时间的清理策略能确保旧数据及时被删除,避免无用数据长期占用存储空间。基于大小的清理策略则可以根据集群的存储容量动态调整日志的保留量。例如,在存储资源紧张的情况下,设置较小的 log.retention.bytes 可以及时清理日志,释放空间。但如果清理策略设置不当,可能会导致数据过早删除,使得一些需要长期保存的数据丢失。

日志清理策略对 Kafka 数据一致性的影响

消费者视角

如果日志清理策略导致消费者尚未消费的消息被删除,就会出现数据丢失的情况,从而破坏数据一致性。例如,消费者由于某些原因(如网络故障、应用程序故障)长时间未消费消息,而此时 Kafka 根据清理策略删除了相关日志段,那么消费者就无法再获取这些消息。为了避免这种情况,Kafka 提供了 auto.offset.reset 配置选项,当消费者发现偏移量对应的消息已被删除时,可以选择从最早的消息(earliest)或最新的消息(latest)开始消费。

配置示例:

auto.offset.reset=earliest

生产者视角

从生产者角度来看,日志清理策略一般不会直接影响数据一致性。只要生产者成功将消息发送到 Kafka 并得到确认,消息就会被持久化存储。然而,如果 Kafka 在消息确认后但在日志清理之前发生故障,且故障导致部分日志丢失,那么可能会出现数据不一致的情况。为了提高数据可靠性,Kafka 提供了副本机制,通过将消息复制到多个副本中,确保即使部分副本出现故障,数据仍然可用。

日志清理策略的优化与调优

合理配置清理参数

根据业务需求和集群资源情况,合理设置日志清理参数是优化的关键。对于一些实时性要求高但数据重要性较低的场景,可以设置较短的 log.retention.hours 和较小的 log.retention.bytes,以快速释放存储资源。而对于需要长期保存数据的场景,则应适当增大这些参数值。例如,在监控数据的采集场景中,数据的时效性较强,设置 log.retention.hours = 12 可能就足够了;而在金融交易数据存储场景中,可能需要设置 log.retention.days = 365 来长期保存数据。

结合业务场景调整策略

不同的业务场景对日志清理有不同的需求。例如,在物联网设备数据采集场景中,设备会持续产生大量数据,此时可以采用基于大小的清理策略,并结合定期的数据归档操作。当日志大小达到一定阈值时,先将部分日志归档到长期存储中(如 HDFS),然后再删除 Kafka 中的日志段,这样既能保证数据的长期保存,又能控制 Kafka 集群的存储压力。

监控与动态调整

通过 Kafka 的监控工具(如 Kafka Manager、Prometheus + Grafana 等)实时监控日志的大小、清理频率等指标。根据监控数据动态调整日志清理策略。例如,如果发现某个分区的日志增长过快,且存储资源即将耗尽,可以临时减小 log.retention.byteslog.retention.hours,待资源压力缓解后再恢复到正常配置。

代码示例:自定义 Kafka 日志清理策略

虽然 Kafka 提供了默认的日志清理策略,但在某些特殊场景下,可能需要自定义清理策略。以下是一个简单的自定义日志清理策略的代码示例,基于 Kafka 2.6.0 版本。

首先,创建一个自定义的日志清理策略类 CustomLogCleanupPolicy,继承自 LogCleaner.CleanupPolicy

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.log.Log;
import org.apache.kafka.log.LogCleaner;
import org.apache.kafka.log.LogCleanerConfig;
import org.apache.kafka.log.LogCleaner.CleanupPolicy;
import org.apache.kafka.log.LogCleanerStats;
import org.apache.kafka.log.LogSegment;
import org.apache.kafka.log.OffsetIndex;
import org.apache.kafka.log.Segment;
import org.apache.kafka.log.SegmentBase;
import org.apache.kafka.log.internals.LogSegmentMetadata;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomLogCleanupPolicy implements CleanupPolicy {

    private final LogCleanerConfig config;
    private final Time time;
    private final LogCleanerStats stats;
    private final AtomicInteger totalSegmentsCleaned = new AtomicInteger(0);

    public CustomLogCleanupPolicy(LogCleanerConfig config, Time time, LogCleanerStats stats) {
        this.config = config;
        this.time = time;
        this.stats = stats;
    }

    @Override
    public List<LogSegment> segmentsToDelete(Log log) {
        List<LogSegment> segmentsToDelete = new ArrayList<>();
        long currentTime = time.milliseconds();
        for (LogSegment segment : log) {
            // 自定义清理逻辑,例如删除创建时间超过特定时长的日志段
            if (currentTime - segment.baseOffsetTimeMs() > 10 * 24 * 3600 * 1000) {
                segmentsToDelete.add(segment);
            }
        }
        return segmentsToDelete;
    }

    @Override
    public void close() {
        // 清理资源的逻辑
    }

    @Override
    public void cleanup(Log log, LogSegment segmentToClean, Map<File, OffsetIndex> indexCache, CompressionType targetCompressionType) {
        // 执行清理操作,例如删除日志段文件
        segmentToClean.delete();
        totalSegmentsCleaned.incrementAndGet();
    }

    @Override
    public void stats(LogCleanerStats stats) {
        stats.recordSegmentsCleaned(totalSegmentsCleaned.get());
    }
}

然后,在 Kafka 配置文件中指定使用自定义的日志清理策略:

log.cleaner.policy.class=com.example.CustomLogCleanupPolicy

在上述代码中,CustomLogCleanupPolicy 类实现了 CleanupPolicy 接口,重写了 segmentsToDelete 方法来定义哪些日志段需要被删除,cleanup 方法来执行实际的清理操作,close 方法用于清理资源,stats 方法用于记录清理统计信息。通过在 Kafka 配置文件中指定 log.cleaner.policy.class,就可以使用自定义的日志清理策略。

不同清理策略在高可用场景下的表现

多副本场景

在 Kafka 的多副本机制下,日志清理策略需要确保各个副本之间的数据一致性。当一个副本根据清理策略删除了日志段时,其他副本也需要相应地进行删除操作。Kafka 通过 ISR(In - Sync Replicas)机制来保证这一点。只有在 ISR 中的副本完成了日志段的删除操作后,才认为清理操作成功。例如,假设某个分区有 3 个副本(副本 0 为主副本,副本 1 和副本 2 为从副本),当主副本根据清理策略删除了一个日志段后,它会将删除操作同步给副本 1 和副本 2,只有当副本 1 和副本 2 也成功删除该日志段后,整个清理操作才完成。

故障恢复场景

在 Kafka 集群发生故障后恢复时,日志清理策略需要正确处理已删除和未删除的日志段。如果在故障发生前某个日志段已经被标记为可删除但尚未实际删除,在恢复过程中,Kafka 需要根据配置的清理策略重新评估该日志段是否仍然需要删除。例如,假设在故障前一个日志段由于超过了 log.retention.hours 被标记为可删除,但还没来得及删除集群就发生了故障。在恢复后,Kafka 会再次检查该日志段的创建时间,如果仍然超过 log.retention.hours,则会继续执行删除操作。

总结不同清理策略的适用场景

基于时间的清理策略适用于对数据时效性要求较高的场景,如实时监控数据、用户行为分析数据等。这些场景下,数据的价值随着时间的推移迅速降低,不需要长期保存。例如,网站的实时访问日志,一般只需要保存几天用于分析近期的用户行为,设置 log.retention.hours = 72 就可以满足需求。

基于大小的清理策略适合存储资源有限的场景,或者对数据总量有严格限制的场景。例如,在一些嵌入式设备或存储资源紧张的边缘节点上运行的 Kafka 集群,通过设置 log.retention.bytes 可以有效控制日志的大小,避免因日志无限增长导致存储资源耗尽。

在实际应用中,也可以结合两种策略,根据业务数据的特点和需求进行灵活配置。例如,对于一些重要的业务数据,先设置较长的 log.retention.hours 以保证数据的长期可用,同时设置 log.retention.bytes 作为存储资源的兜底限制,防止日志无限制增长。

通过深入理解 Kafka 日志清理策略及其影响,并结合实际业务场景进行合理配置和优化,可以确保 Kafka 集群高效、稳定地运行,同时有效利用存储资源,保证数据的一致性和完整性。