MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 消息的可靠性投递保障

2023-06-056.5k 阅读

Kafka 消息可靠性投递保障概述

在分布式系统中,Kafka 作为一款高性能的消息队列,被广泛应用于数据的实时处理和异步通信场景。然而,在实际应用中,确保 Kafka 消息的可靠投递至关重要。消息的丢失或重复可能会导致数据不一致、业务逻辑错误等问题,严重影响系统的稳定性和正确性。

Kafka 从多个方面来保障消息的可靠性投递,这涉及到生产者端、消费者端以及 Kafka 集群本身的配置和机制。理解这些方面并合理配置和使用,是构建可靠消息传递系统的关键。

生产者端的可靠性保障

1. 消息发送模式

Kafka 生产者有三种消息发送模式:发后即忘(fire - and - forget)同步发送(sync send)异步发送(async send)

  • 发后即忘:生产者调用 send() 方法发送消息后,不等待 Kafka 集群的确认就继续执行后续代码。这种方式性能最高,但可靠性最低,因为如果消息发送过程中出现网络故障等问题,生产者无法得知消息是否成功发送。示例代码如下:
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my - topic", "key", "value");
producer.send(record);
  • 同步发送:生产者调用 send() 方法发送消息后,通过 get() 方法阻塞等待 Kafka 集群的确认。只有收到确认后,生产者才继续执行后续代码。这种方式可靠性较高,但性能相对较低,因为阻塞会影响生产者的吞吐量。示例代码如下:
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my - topic", "key", "value");
try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("Message sent to partition " + metadata.partition() +
        " at offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
  • 异步发送:生产者调用 send() 方法发送消息时,传入一个回调函数。Kafka 集群处理完消息后,会调用这个回调函数告知生产者消息发送的结果。这种方式在保证一定可靠性的同时,也能维持较高的性能。示例代码如下:
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my - topic", "key", "value");
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println("Message sent to partition " + metadata.partition() +
                " at offset " + metadata.offset());
        }
    }
});

2. 配置参数对可靠性的影响

  • acks 参数:该参数指定了生产者在收到多少个副本的确认后才认为消息发送成功。
    • acks = 0:生产者发送消息后,不等待任何副本的确认,直接认为消息发送成功。这种情况下,消息最容易丢失,例如在消息发送到 Kafka 集群但还未被任何副本接收时,若发生网络故障,消息就会丢失。
    • acks = 1:生产者发送消息后,只要 Leader 副本接收到消息并写入本地日志,就认为消息发送成功。如果此时 Leader 副本在将消息同步给其他副本之前发生故障,消息可能会丢失。
    • acks = allacks = -1:生产者发送消息后,需要等待所有同步副本(ISR 中的副本)都接收到消息并写入本地日志,才认为消息发送成功。这种配置下,消息丢失的可能性最小,但会降低系统的性能,因为需要等待多个副本的确认。
  • retries 参数:当消息发送失败时,生产者会根据 retries 参数指定的次数进行重试。默认情况下,retries = 0,即不进行重试。如果设置了合理的 retries 值,例如 retries = 3,当消息发送因为网络闪断等瞬时故障失败时,生产者会自动重试 3 次,提高消息成功发送的概率。
  • linger.ms 参数:该参数指定了生产者在将消息批次发送到 Kafka 集群之前等待的时间(毫秒)。默认值为 0,即生产者会立即发送消息。如果设置一个大于 0 的值,例如 linger.ms = 5,生产者会在 5 毫秒内收集更多的消息形成批次再发送,这样可以提高吞吐量,但也会增加消息的发送延迟。在一些对延迟要求不高的场景下,适当增大 linger.ms 的值有助于减少网络请求次数,提高消息发送的可靠性。

消费者端的可靠性保障

1. 消费模式与消息确认

Kafka 消费者有两种消费模式:自动提交偏移量(auto - commit)手动提交偏移量(manual - commit)

  • 自动提交偏移量:消费者在配置 enable.auto.commit = true 时,会定期(由 auto.commit.interval.ms 参数指定间隔时间)自动将已消费消息的偏移量提交给 Kafka 集群。这种方式简单方便,但可能会导致消息的重复消费。例如,在消费者消费了一批消息但还未自动提交偏移量时,消费者发生故障重启,由于偏移量未提交,重启后的消费者会再次消费这批消息。示例代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my - group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my - topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("Received message: " + record.value());
    }
}
  • 手动提交偏移量:消费者在配置 enable.auto.commit = false 时,需要手动调用 commitSync()commitAsync() 方法来提交偏移量。commitSync() 方法是同步提交,会阻塞当前线程直到偏移量提交成功;commitAsync() 方法是异步提交,不会阻塞线程,但可能会因为提交失败而丢失提交结果。手动提交偏移量可以让开发者更精确地控制消息的消费进度,避免消息的重复消费。示例代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my - group");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my - topic"));
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("Received message: " + record.value());
        }
        consumer.commitSync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

2. 处理反压

在高负载情况下,消费者可能无法及时处理接收到的消息,导致消费速度跟不上生产速度,这就是所谓的“反压”问题。如果不处理反压,可能会导致 Kafka 集群的内存溢出等问题,进而影响消息的可靠性。

  • 调整消费者处理能力:优化消费者的业务逻辑,提高消息处理速度。例如,减少不必要的 I/O 操作、优化算法等。
  • 增加消费者实例:通过增加消费者实例的数量,可以提高整体的消费能力。在 Kafka 中,可以通过调整 consumer group 中的消费者数量来实现。
  • 使用流处理框架:如 Kafka Streams 或 Flink 等流处理框架,它们提供了更高级的反压处理机制。例如,Kafka Streams 可以自动调整内部缓冲区的大小,以适应不同的负载情况。

Kafka 集群的可靠性保障

1. 副本机制

Kafka 通过副本机制来提高数据的可靠性。每个分区都可以配置多个副本,其中一个副本为 Leader 副本,其他副本为 Follower 副本。生产者发送的消息首先会被写入 Leader 副本,然后 Leader 副本会将消息同步给 Follower 副本。

  • ISR(In - Sync Replicas):ISR 是一组与 Leader 副本保持同步的 Follower 副本集合。只有 ISR 中的副本被认为是“同步”的,生产者在 acks = all 配置下,需要等待 ISR 中的所有副本都确认收到消息,才认为消息发送成功。如果某个 Follower 副本与 Leader 副本的差距超过一定阈值(由 replica.lag.time.max.ms 参数指定),该 Follower 副本会被从 ISR 中移除。当该 Follower 副本重新追上 Leader 副本时,会重新加入 ISR。
  • 副本选举:当 Leader 副本发生故障时,Kafka 会从 ISR 中选举一个新的 Leader 副本。选举算法基于副本的 LEO(Log End Offset,副本日志的最后偏移量),LEO 最大的副本通常会被选举为新的 Leader 副本。这确保了新的 Leader 副本拥有最新的数据,从而保证消息的可靠性。

2. 日志清理策略

Kafka 提供了两种日志清理策略:删除策略(delete)压缩策略(compact)

  • 删除策略:按照一定的规则删除过期的日志段。可以通过 log.retention.hourslog.retention.minuteslog.retention.ms 参数指定日志保留的时间,也可以通过 log.retention.bytes 参数指定日志保留的最大字节数。当达到这些阈值时,Kafka 会删除旧的日志段。这种策略适用于对历史数据要求不高的场景,例如实时监控数据等。
  • 压缩策略:对于每个消息 key,只保留最新的消息。Kafka 会遍历日志段,将相同 key 的旧消息删除,只保留最新的一条。这种策略适用于需要保留每个 key 的最新状态的场景,例如用户信息的更新等。合理选择日志清理策略有助于保证 Kafka 集群的磁盘空间使用和消息的可靠性,避免因磁盘空间不足导致消息无法写入的情况。

端到端的可靠性保障示例

下面通过一个完整的示例来展示如何从生产者端、消费者端以及 Kafka 集群配置来保障消息的端到端可靠性。

1. 生产者配置与代码

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ReliableProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("linger.ms", 5);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("reliable - topic", "key" + i, "value" + i);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        exception.printStackTrace();
                    } else {
                        System.out.println("Message sent to partition " + metadata.partition() +
                            " at offset " + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

在这个生产者代码中,我们设置 acks = all 确保所有同步副本确认消息,retries = 3 进行重试,linger.ms = 5 以提高吞吐量。

2. Kafka 集群配置

server.properties 文件中,设置以下关键参数来保障可靠性:

# 副本同步的最大延迟时间
replica.lag.time.max.ms = 10000
# 日志保留时间为 7 天
log.retention.hours = 168
# 每个分区的副本数
num.replica.fetchers = 2

这些配置确保了副本同步的及时性,合理的日志保留时间以及足够的副本数量。

3. 消费者配置与代码

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ReliableConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "reliable - group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("reliable - topic"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

消费者配置 enable.auto.commit = false 并手动提交偏移量,确保消息不被重复消费。

通过以上生产者、消费者和 Kafka 集群的配置和代码示例,可以构建一个相对可靠的消息投递系统,在实际应用中,还需要根据具体的业务需求和性能要求进行进一步的优化和调整。

常见问题及解决方案

1. 消息丢失问题

  • 原因分析:可能是生产者端 acks 配置不合理,如设置为 01 且 Leader 副本未及时同步给 Follower 副本就发生故障;也可能是消费者端在自动提交偏移量模式下,消费消息后还未提交偏移量就发生故障。
  • 解决方案:在生产者端设置 acks = all,并合理设置 retries 参数;在消费者端使用手动提交偏移量模式,并在消息处理成功后及时提交偏移量。

2. 消息重复问题

  • 原因分析:主要是消费者端在自动提交偏移量模式下,由于提交延迟或故障导致偏移量未及时更新,重启后重复消费。另外,生产者在重试机制下,可能因为网络等问题导致消息重复发送。
  • 解决方案:消费者端采用手动提交偏移量,并在处理完一批消息后再提交;生产者端可以通过幂等性生产者(enable.idempotence = true)来避免重复发送,幂等性生产者会为每个消息分配一个唯一的序列号,Kafka 集群会根据序列号来判断是否为重复消息并进行去重。

3. Kafka 集群性能问题

  • 原因分析:副本数量过多会增加同步开销,日志清理策略不合理可能导致磁盘 I/O 压力过大,消费者处理能力不足会导致反压等。
  • 解决方案:根据实际业务需求合理配置副本数量,选择合适的日志清理策略;优化消费者业务逻辑,增加消费者实例数量或使用流处理框架来处理反压。

总结与展望

确保 Kafka 消息的可靠性投递是构建稳定、高效分布式系统的关键。通过在生产者端、消费者端以及 Kafka 集群进行合理的配置和优化,可以有效提高消息投递的可靠性。同时,要注意不同配置对系统性能的影响,在可靠性和性能之间找到平衡。随着分布式系统的不断发展,Kafka 也在持续演进,未来可能会有更强大的可靠性保障机制和优化策略出现,开发者需要不断关注和学习,以更好地应用 Kafka 构建可靠的消息传递系统。在实际项目中,还需要结合具体的业务场景,对 Kafka 的可靠性保障机制进行灵活运用和优化,以满足业务对数据一致性和准确性的要求。例如,在金融领域的交易系统中,对消息的可靠性要求极高,需要采用严格的配置和验证机制;而在一些实时监控场景中,可能对消息的实时性要求更高,在保障可靠性的同时,要尽量减少延迟。总之,深入理解 Kafka 消息可靠性投递保障机制,并根据实际情况进行优化,是开发者在使用 Kafka 过程中的重要任务。