MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

保障 Kafka 消息可靠性的实用技巧

2023-07-233.6k 阅读

Kafka 消息可靠性概述

Kafka 作为一款高吞吐量的分布式消息系统,在数据传输和处理流程中,消息的可靠性是至关重要的考量因素。所谓消息可靠性,简单来说,就是确保消息在生产、传输和消费过程中不会丢失、重复或损坏。

在 Kafka 中,消息可靠性的保障面临着诸多挑战。从生产端来看,网络波动、Broker 节点故障等都可能导致消息发送失败。在传输过程中,集群内部的复制机制虽然能够提高数据的可用性,但也存在复制延迟或不完全的风险。消费端则可能因为处理逻辑的异常、消费者故障等情况,未能正确消费消息。

Kafka 消息传递语义

  1. 最多一次(At most once):生产者发送消息后,不关心消息是否真正到达 Broker。这种语义下,消息可能会丢失,但不会重复。例如,在网络不稳定的情况下,生产者可能在发送消息后没有收到 Broker 的确认,就直接认为消息发送成功,而实际上消息可能并未被 Broker 接收。
  2. 最少一次(At least once):生产者确保消息至少被发送一次。如果没有收到 Broker 的确认,会重试发送。这种语义下,消息不会丢失,但可能会重复。比如,生产者在重试发送消息时,可能会因为网络延迟,Broker 先接收到了第一次发送的消息,然后又接收到了重试发送的相同消息。
  3. 精确一次(Exactly once):这是最严格的语义,确保消息只被发送且处理一次,既不丢失也不重复。Kafka 从 0.11.0.0 版本开始引入了幂等性生产者和事务,以支持精确一次语义。

保障生产端消息可靠性

同步发送与异步发送

  1. 同步发送
    • 同步发送是指生产者发送消息后,等待 Broker 返回确认响应,只有收到响应后才继续发送下一条消息。这种方式可以确保消息的发送成功,但可能会影响性能,因为生产者在等待确认时处于阻塞状态。
    • 以下是 Java 中使用 Kafka 同步发送消息的代码示例:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaSyncProducer {
    public static void main(String[] args) {
        // 设置 Kafka 生产者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建 Kafka 生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 构建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "message1");

        try {
            // 同步发送消息
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("Message sent successfully to partition " + metadata.partition() +
                    " at offset " + metadata.offset());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
  1. 异步发送
    • 异步发送是指生产者发送消息后,不等待 Broker 的确认响应,继续发送下一条消息。生产者通过回调函数来处理发送结果。这种方式可以提高发送性能,但需要更复杂的错误处理逻辑,因为在消息发送过程中可能会出现错误,而生产者可能已经继续发送了其他消息。
    • 以下是 Java 中使用 Kafka 异步发送消息并添加回调的代码示例:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaAsyncProducer {
    public static void main(String[] args) {
        // 设置 Kafka 生产者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建 Kafka 生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 构建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "message1");

        // 异步发送消息并添加回调
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("Message sent successfully to partition " + metadata.partition() +
                            " at offset " + metadata.offset());
                } else {
                    System.out.println("Error sending message: " + exception.getMessage());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

配置生产者参数

  1. acks 参数
    • acks 参数决定了生产者在等待 Broker 确认消息接收时的行为。它有三个可选值:
      • acks = 0:生产者发送消息后,不等待 Broker 的确认,直接继续发送下一条消息。这种方式速度最快,但消息可靠性最低,在网络不稳定或 Broker 故障时,消息很可能丢失。
      • acks = 1:生产者发送消息后,等待 Leader 副本确认消息已成功写入本地日志。如果 Leader 在确认消息后但在将消息复制到 Follower 副本之前发生故障,消息可能会丢失。
      • acks = allacks = -1:生产者发送消息后,等待所有同步副本(ISR 中的副本)确认消息已成功写入。这种方式可以最大程度保障消息可靠性,但性能相对较低,因为需要等待多个副本的确认。
  2. retries 参数
    • retries 参数设置了生产者在发送消息失败时的重试次数。默认情况下,重试次数为 0。当设置一个大于 0 的值时,如果消息发送失败,生产者会按照指定的次数重试发送。例如,如果网络波动导致消息发送失败,生产者可以重试,提高消息成功发送的概率。但需要注意的是,过多的重试可能会导致消息重复,尤其是在 Broker 已经成功接收消息但确认响应丢失的情况下。
  3. max.in.flight.requests.per.connection 参数
    • max.in.flight.requests.per.connection 参数限制了生产者在单个连接上未完成的请求数量。如果设置为 1,可以保证消息的顺序性,但会降低性能。因为在一个请求未完成(未收到 Broker 确认)之前,生产者不能发送下一个请求。如果设置较大的值,可以提高性能,但可能会导致消息顺序混乱,因为多个请求可能会以不同的顺序被确认。

使用幂等性生产者

  1. 幂等性原理
    • 幂等性生产者通过使用生产者 ID(PID)和序列号(Sequence Number)来确保消息的精确一次发送。当生产者启动时,Kafka 会为其分配一个唯一的 PID。对于每个 Topic - Partition,生产者会维护一个序列号,每次发送消息时,序列号递增。Broker 会缓存每个 PID 对应的每个 Topic - Partition 的最后一个序列号。当 Broker 接收到消息时,会检查消息的序列号是否比缓存的序列号大 1。如果是,则接受消息并更新缓存的序列号;如果不是,则拒绝消息。这样可以避免重复消息的写入。
  2. 配置幂等性生产者
    • 在 Kafka 中配置幂等性生产者非常简单,只需要在生产者配置中设置 enable.idempotence = true 即可。以下是一个配置示例:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  • 幂等性生产者在单个会话内可以保证消息的精确一次语义,但在跨会话(例如生产者重启)的情况下,无法保证完全的精确一次。如果需要跨会话的精确一次语义,需要使用事务。

使用事务

  1. 事务原理
    • Kafka 的事务允许生产者将多个消息发送操作组合成一个原子性的操作。生产者使用 beginTransaction() 方法开始一个事务,然后进行多个消息发送操作,最后使用 commitTransaction() 方法提交事务。如果在事务执行过程中发生错误,生产者可以使用 abortTransaction() 方法回滚事务。
    • 在事务中,Kafka 会使用一个事务协调器(Transaction Coordinator)来管理事务状态。事务协调器为每个生产者分配一个唯一的事务 ID,生产者使用这个事务 ID 来标记事务中的所有消息。这样,即使生产者在事务执行过程中重启,也可以通过事务 ID 恢复事务状态,确保事务中的所有消息要么全部成功提交,要么全部回滚。
  2. 使用事务的代码示例
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaTransactionProducer {
    public static void main(String[] args) {
        // 设置 Kafka 生产者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // 创建 Kafka 生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 初始化事务
        producer.initTransactions();

        try {
            // 开始事务
            producer.beginTransaction();

            // 发送消息
            ProducerRecord<String, String> record1 = new ProducerRecord<>("test-topic", "key1", "message1");
            ProducerRecord<String, String> record2 = new ProducerRecord<>("test-topic", "key2", "message2");
            producer.send(record1);
            producer.send(record2);

            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 回滚事务
            producer.abortTransaction();
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

保障 Broker 端消息可靠性

副本机制与 ISR

  1. 副本机制
    • Kafka 通过副本机制来提高数据的可用性和容错性。每个 Topic 的每个 Partition 都可以有多个副本,其中一个副本被选举为 Leader,其他副本为 Follower。生产者发送的消息首先被写入 Leader 副本,然后 Leader 副本将消息复制到 Follower 副本。消费者从 Leader 副本读取消息。
    • 当 Leader 副本发生故障时,Kafka 会从 Follower 副本中选举一个新的 Leader。这样可以确保即使某个节点发生故障,消息仍然可以被正常读写。
  2. ISR(In - Sync Replicas)
    • ISR 是指与 Leader 副本保持同步的 Follower 副本集合。只有在 ISR 中的副本才被认为是可靠的,可以参与 Leader 选举。Kafka 会动态维护 ISR,当 Follower 副本与 Leader 副本的差距超过一定阈值(由 replica.lag.time.max.ms 参数配置,默认 10000 毫秒)时,会将该 Follower 副本从 ISR 中移除;当 Follower 副本重新追上 Leader 副本时,会重新加入 ISR。
    • 为了保障消息可靠性,当 acks = all 时,只有 ISR 中的所有副本都确认收到消息,生产者才会认为消息发送成功。因此,合理配置 ISR 相关参数对于保障消息可靠性至关重要。如果 ISR 中的副本数量过少,可能会在 Leader 故障时丢失消息;如果 ISR 中的副本数量过多,可能会影响消息的写入性能。

日志留存策略

  1. 基于时间的留存策略
    • Kafka 可以根据消息的创建时间来决定消息的留存时间。通过 log.retention.hours(默认 168 小时,即 7 天)、log.retention.minuteslog.retention.ms 参数可以配置消息的留存时间。当消息的创建时间超过配置的留存时间时,Kafka 会自动删除这些消息。
    • 例如,如果将 log.retention.hours 设置为 24,那么 Kafka 会删除所有创建时间超过 24 小时的消息。这种策略适用于对历史数据需求不高,希望及时清理磁盘空间的场景。
  2. 基于大小的留存策略
    • Kafka 还可以根据日志文件的大小来决定消息的留存策略。通过 log.retention.bytes 参数可以配置每个 Partition 的日志文件大小上限。当 Partition 的日志文件大小超过这个上限时,Kafka 会删除最早的消息,以腾出空间。
    • 例如,如果将 log.retention.bytes 设置为 1073741824(即 1GB),当某个 Partition 的日志文件大小达到 1GB 时,Kafka 会开始删除最早的消息,直到日志文件大小低于 1GB。
  3. 综合留存策略
    • Kafka 会同时考虑基于时间和基于大小的留存策略,只要满足其中一个条件,就会删除消息。例如,如果同时设置了 log.retention.hours = 24log.retention.bytes = 1GB,当消息创建时间超过 24 小时或者日志文件大小超过 1GB 时,都会触发消息删除操作。合理配置这两个留存策略可以在保障消息可靠性(确保重要消息不会过早删除)和节省磁盘空间之间找到平衡。

保障消费端消息可靠性

手动提交位移

  1. 自动提交位移的问题
    • Kafka 默认采用自动提交位移的方式,消费者定期(由 auto.commit.interval.ms 参数配置,默认 5000 毫秒)将已消费消息的位移提交给 Kafka。这种方式虽然简单,但存在消息丢失和重复消费的风险。
    • 例如,假设消费者每 5 秒自动提交一次位移,在一次提交后,消费者开始处理消息。但在处理过程中,消费者发生故障,而此时还有部分消息未处理完成。当消费者重启后,由于之前已经提交了位移,它会从上次提交的位移处继续消费,导致未处理完成的消息丢失。另一方面,如果消费者在处理完消息但还未提交位移时发生故障,重启后会重复消费这些已经处理过的消息。
  2. 手动提交位移的方式
    • 手动提交位移可以避免自动提交位移带来的问题。消费者在成功处理完消息后,手动调用 commitSync()commitAsync() 方法来提交位移。
    • commitSync() 方法是同步提交,它会等待 Kafka 确认位移提交成功后才返回。如果提交失败,它会重试,直到提交成功或者抛出异常。以下是使用 commitSync() 方法手动提交位移的代码示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaManualCommitSyncConsumer {
    public static void main(String[] args) {
        // 设置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                    // 处理消息
                    //...
                }
                // 手动同步提交位移
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}
  • commitAsync() 方法是异步提交,它不会等待 Kafka 的确认响应,直接返回。这种方式可以提高性能,但如果提交失败,不会自动重试。可以通过添加回调函数来处理提交结果。以下是使用 commitAsync() 方法并添加回调的代码示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaManualCommitAsyncConsumer {
    public static void main(String[] args) {
        // 设置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                    // 处理消息
                    //...
                }
                // 手动异步提交位移并添加回调
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception!= null) {
                            System.out.println("Commit failed: " + exception.getMessage());
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

处理消费逻辑异常

  1. 消费逻辑异常类型
    • 在消费消息时,可能会出现各种类型的异常。例如,消息格式不符合预期,导致反序列化失败;业务逻辑处理过程中出现错误,如数据库插入失败等。这些异常如果不妥善处理,可能会导致消息无法被正确消费,从而影响消息的可靠性。
  2. 异常处理策略
    • 对于消息反序列化异常,可以在消费者配置中设置合适的反序列化器,并在反序列化失败时进行日志记录或采取其他补偿措施。例如,可以将反序列化失败的消息发送到一个专门的 Topic 中,以便后续分析处理。
    • 对于业务逻辑处理异常,可以采用重试机制。例如,在数据库插入失败时,可以重试一定次数。如果重试多次仍失败,可以将消息发送到一个死信队列(Dead Letter Queue,DLQ)中。死信队列是一个专门用于存放处理失败消息的 Topic,后续可以由人工或其他程序对这些消息进行处理。
    • 以下是一个简单的处理业务逻辑异常并进行重试的代码示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithRetry {
    private static final int MAX_RETRIES = 3;

    public static void main(String[] args) {
        // 设置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    int retries = 0;
                    boolean success = false;
                    while (retries < MAX_RETRIES &&!success) {
                        try {
                            // 模拟业务逻辑处理
                            System.out.println("Processing message: " + record.value());
                            // 这里可以是实际的业务逻辑,如数据库插入等
                            success = true;
                        } catch (Exception e) {
                            retries++;
                            System.out.println("Retry " + retries + " for message " + record.value() + ": " + e.getMessage());
                        }
                    }
                    if (!success) {
                        // 处理多次重试仍失败的情况,例如发送到死信队列
                        System.out.println("Failed to process message after " + MAX_RETRIES + " retries.");
                    }
                }
                // 手动提交位移
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

消费者组与再均衡

  1. 消费者组原理
    • 消费者组是 Kafka 提供的一种多消费者协作消费的机制。一个消费者组内可以有多个消费者实例,它们共同消费一个或多个 Topic 的消息。每个 Partition 在同一时刻只能被一个消费者组内的一个消费者实例消费。这样可以提高消费的并行度,加快消息处理速度。
    • 当消费者组中的某个消费者实例发生故障或者新的消费者实例加入时,会触发再均衡(Rebalance)。再均衡的目的是重新分配 Partition 给消费者实例,确保消费的负载均衡。
  2. 再均衡对消息可靠性的影响及处理
    • 再均衡过程中可能会出现消息重复消费或丢失的情况。例如,在再均衡开始时,消费者实例会停止消费消息,此时如果有新消息到达,可能会在再均衡完成后被重复消费。另外,如果在再均衡过程中,消费者实例还未提交位移就被关闭,可能会导致消息丢失。
    • 为了避免再均衡过程中消息可靠性问题,可以在消费者配置中设置 max.poll.interval.ms 参数,该参数表示消费者在两次调用 poll() 方法之间的最大时间间隔。如果超过这个时间间隔,Kafka 会认为消费者已死亡,触发再均衡。合理设置这个参数可以确保消费者有足够的时间处理完消息并提交位移,从而避免消息丢失。
    • 同时,可以在消费者实现 ConsumerRebalanceListener 接口,在再均衡开始前暂停消费并提交位移,在再均衡完成后重新开始消费。以下是实现 ConsumerRebalanceListener 接口的代码示例:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

public class KafkaConsumerWithRebalanceListener {
    public static void main(String[] args) {
        // 设置 Kafka 消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        // 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题并添加再均衡监听器
        consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("Partitions revoked: " + partitions);
                // 暂停消费并提交位移
                consumer.commitSync();
                consumer.pause(partitions);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("Partitions assigned: " + partitions);
                // 重新开始消费
                consumer.resume(partitions);
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                    // 处理消息
                    //...
                }
                // 手动提交位移
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

通过在生产端、Broker 端和消费端采取上述实用技巧,可以有效保障 Kafka 消息的可靠性,使其在各种复杂的生产环境中稳定运行,满足不同业务场景对消息处理的严格要求。