MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 架构 Consumer 消费消息流程

2023-11-232.2k 阅读

Kafka Consumer 基础概念

Kafka 作为一种高性能、分布式的消息队列系统,Consumer(消费者)在其中扮演着至关重要的角色。Consumer 负责从 Kafka 的主题(Topic)中读取消息并进行相应处理。在 Kafka 中,Consumer 是以 Consumer Group(消费者组)的形式存在的。一个 Consumer Group 可以包含一个或多个 Consumer 实例,它们共同协作消费一个或多个 Topic 的消息。

每个 Topic 被划分为多个 Partition(分区),这种设计使得 Kafka 能够实现高吞吐量和水平扩展。Consumer Group 中的每个 Consumer 实例负责消费一个或多个 Partition 中的消息。这样的机制保证了在一个 Consumer Group 内,每个 Partition 只会被一个 Consumer 实例消费,从而避免了消息的重复消费,同时也实现了消费的负载均衡。

例如,假设有一个 Topic 包含 3 个 Partition,而一个 Consumer Group 中有 2 个 Consumer 实例。那么,其中一个 Consumer 实例可能会负责消费 2 个 Partition 的消息,另一个 Consumer 实例负责消费剩下的 1 个 Partition 的消息。

Kafka Consumer 消费流程概述

  1. 初始化阶段
    • 当一个 Kafka Consumer 启动时,首先需要进行配置。配置项包括 Kafka 集群的地址(bootstrap.servers)、Consumer Group 的 ID(group.id)、消息的反序列化器(key.deserializer 和 value.deserializer)等。
    • 以 Java 语言为例,使用 Kafka 的官方客户端库,初始化代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  • 在这个初始化代码中,我们设置了 Kafka 集群地址为本地的 localhost:9092,Consumer Group ID 为 test - group,并指定了消息的键和值的反序列化器为 StringDeserializer,这意味着我们期望接收到的消息的键和值都是字符串类型。
  1. 订阅主题
    • Consumer 初始化完成后,需要订阅一个或多个 Topic。可以通过 subscribe 方法来实现。
    • 继续上面的 Java 代码示例:
consumer.subscribe(Arrays.asList("test - topic"));
  • 这里,我们让 Consumer 订阅了名为 test - topic 的主题。如果需要订阅多个主题,可以将主题名称以列表的形式传递给 subscribe 方法。
  1. 协调器分配分区
    • Kafka 引入了 Consumer Coordinator(消费者协调器)来管理 Consumer Group。当 Consumer 订阅主题后,Consumer Coordinator 会为 Consumer Group 中的各个 Consumer 实例分配 Partition。
    • 分配过程如下:
      • 首先,Consumer Coordinator 会收集 Consumer Group 中所有活跃的 Consumer 实例的信息。
      • 然后,根据 Partition 的数量和 Consumer 实例的数量,使用一定的分配策略(如 Range 策略或 Round - Robin 策略)来分配 Partition。
      • 例如,使用 Range 分配策略时,假设一个 Topic 有 10 个 Partition,Consumer Group 中有 3 个 Consumer 实例。那么,第一个 Consumer 实例可能会分配到 Partition 0 - 3,第二个 Consumer 实例分配到 Partition 4 - 6,第三个 Consumer 实例分配到 Partition 7 - 9。
  2. 拉取消息
    • Consumer 实例在分配到 Partition 后,就开始从对应的 Partition 拉取消息。Consumer 采用的是拉(Pull)模式,而不是推(Push)模式。这意味着 Consumer 主动向 Kafka Broker 发送拉取请求,Broker 根据请求返回相应的消息。
    • 拉取消息的过程是循环进行的。在 Java 客户端中,代码如下:
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}
  • 在这段代码中,consumer.poll 方法用于拉取消息。poll 方法的参数 Duration.ofMillis(100) 表示等待拉取消息的最长时间为 100 毫秒。如果在这个时间内有新消息到达,poll 方法会返回一批消息(ConsumerRecords)。Consumer 会遍历这批消息并进行相应处理,这里简单地将消息的偏移量(offset)、键(key)和值(value)打印出来。
  1. 消息处理
    • Consumer 拉取到消息后,会按照业务需求进行处理。处理逻辑可以是非常多样化的,例如将消息写入数据库、进行数据转换、调用其他服务等。
    • 以将消息写入 MySQL 数据库为例,假设我们使用 JDBC 连接 MySQL,代码如下:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

// 在处理消息的循环中添加如下代码
try (Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password")) {
    String sql = "INSERT INTO messages (key, value) VALUES (?,?)";
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        for (ConsumerRecord<String, String> record : records) {
            pstmt.setString(1, record.key());
            pstmt.setString(2, record.value());
            pstmt.executeUpdate();
        }
    }
} catch (SQLException e) {
    e.printStackTrace();
}
  • 在这段代码中,我们首先通过 JDBC 连接到本地的 MySQL 数据库,然后定义了一个 SQL 插入语句,将消息的键和值插入到名为 messages 的表中。
  1. 提交偏移量
    • Consumer 在处理完消息后,需要向 Kafka 提交消息的偏移量(offset)。偏移量表示 Consumer 在 Partition 中已经消费到的位置。提交偏移量有两种方式:自动提交和手动提交。
    • 自动提交
      • 可以通过配置 enable.auto.committrue 来开启自动提交,并且可以通过 auto.commit.interval.ms 配置自动提交的时间间隔。例如:
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
 - 这里设置了每 5000 毫秒(5 秒)自动提交一次偏移量。自动提交的优点是简单方便,但也存在一定风险。如果在自动提交偏移量之前 Consumer 发生故障,可能会导致部分消息被重复消费。
  • 手动提交
    • 手动提交偏移量可以更精确地控制消息的消费位置。在 Java 客户端中,可以使用 commitSynccommitAsync 方法。
    • commitSync 方法是同步提交,会阻塞当前线程直到偏移量提交成功。例如:
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync();
    }
} finally {
    consumer.close();
}
 - `commitAsync` 方法是异步提交,不会阻塞当前线程。它有一个回调函数,可以在提交完成后执行相应逻辑。例如:
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitAsync((offsets, exception) -> {
            if (exception!= null) {
                System.err.println("Commit failed for offsets " + offsets);
            }
        });
    }
} finally {
    consumer.close();
}
 - 手动提交偏移量的优点是可以在消息处理完成后再提交,确保消息不会被重复消费,但需要开发者更加小心地处理提交逻辑,避免出现偏移量提交失败等问题。

Kafka Consumer 高级特性与原理

  1. 再均衡(Rebalance)
    • 再均衡是 Kafka Consumer 中的一个重要机制。当 Consumer Group 中的成员发生变化(如新增 Consumer 实例、某个 Consumer 实例故障退出),或者 Topic 的 Partition 数量发生变化时,Kafka 会触发再均衡。
    • 再均衡的过程
      • 加入组(Join Group):当一个新的 Consumer 实例启动并加入到 Consumer Group 时,它会向 Consumer Coordinator 发送 JoinGroup 请求。Consumer Coordinator 会收集所有发送 JoinGroup 请求的 Consumer 实例的信息。
      • 分配方案(Sync Group):Consumer Coordinator 根据收集到的 Consumer 实例信息和 Partition 信息,使用选定的分配策略生成新的 Partition 分配方案。然后,它会向每个 Consumer 实例发送 SyncGroup 请求,其中包含新的分配方案。
      • 执行分配:Consumer 实例接收到 SyncGroup 请求后,根据分配方案开始消费对应的 Partition。
    • 例如,假设一个 Consumer Group 中有 3 个 Consumer 实例,突然其中一个 Consumer 实例发生故障。此时,Consumer Coordinator 会检测到成员变化,触发再均衡。它会重新分配 Partition,使得剩下的 2 个 Consumer 实例能够消费所有的 Partition。
    • 再均衡虽然能够保证 Consumer Group 的正常工作,但也会带来一些问题。在再均衡期间,Consumer 实例会停止消费消息,可能会导致短暂的数据处理中断。为了减少再均衡的影响,可以尽量保持 Consumer Group 的成员稳定,避免频繁地添加或移除 Consumer 实例。
  2. 消费者拦截器(Consumer Interceptor)
    • Kafka Consumer 提供了消费者拦截器机制,允许开发者在消息被消费之前或偏移量提交之前对消息进行自定义处理。
    • 实现消费者拦截器
      • 首先,需要实现 ConsumerInterceptor 接口。该接口包含两个主要方法:onConsumeonCommit
      • onConsume 方法在消息被 Consumer 处理之前调用,可以用于对消息进行过滤、修改等操作。例如,我们可以实现一个简单的拦截器,只允许消费消息值长度大于 10 的消息:
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        List<ConsumerRecord<String, String>> filteredRecords = new ArrayList<>();
        for (ConsumerRecord<String, String> record : records) {
            if (record.value().length() > 10) {
                filteredRecords.add(record);
            }
        }
        return new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>() {{
            putAll(records.partitions().stream().collect(HashMap::new,
                    (m, p) -> m.put(p, new ArrayList<>()),
                    HashMap::putAll));
            records.partitions().forEach(p -> {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(p);
                partitionRecords.forEach(r -> {
                    if (r.value().length() > 10) {
                        get(p).add(r);
                    }
                });
            });
        }});
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 可以在此处进行偏移量提交前的处理,如记录日志等
    }

    @Override
    public void close() {
        // 关闭拦截器时的清理操作
    }

    @Override
    public void configure(Map<String,?> configs) {
        // 配置拦截器
    }
}
 - 然后,在初始化 Consumer 时,将拦截器添加到配置中:
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(MyConsumerInterceptor.class.getName()));
  • 通过消费者拦截器,我们可以实现一些通用的消息处理逻辑,如数据过滤、日志记录等,而不需要在每个 Consumer 的业务逻辑中重复编写。
  1. Consumer 与 Kafka Broker 的交互协议
    • Kafka Consumer 与 Broker 之间通过 Kafka 协议进行通信。主要的请求类型有 FetchRequest(拉取消息请求)和 OffsetCommitRequest(提交偏移量请求)等。
    • FetchRequest
      • Consumer 向 Broker 发送 FetchRequest 来拉取消息。FetchRequest 中包含了要拉取的 Topic、Partition 信息,以及期望拉取的消息偏移量、最大字节数等参数。
      • Broker 接收到 FetchRequest 后,会根据请求中的参数从相应的 Partition 中读取消息,并以 FetchResponse 的形式返回给 Consumer。FetchResponse 中包含了拉取到的消息、下一次拉取的偏移量等信息。
    • OffsetCommitRequest
      • 当 Consumer 提交偏移量时,会向 Broker 发送 OffsetCommitRequestOffsetCommitRequest 中包含了 Consumer Group ID、Topic、Partition 以及要提交的偏移量等信息。
      • Broker 接收到 OffsetCommitRequest 后,会将偏移量记录下来,以便在 Consumer 重启或发生再均衡时,能够知道从哪里继续消费消息。
    • 了解这些交互协议对于深入理解 Kafka Consumer 的工作原理以及排查可能出现的问题非常有帮助。例如,当 Consumer 拉取不到消息时,可以检查 FetchRequestFetchResponse 的相关参数,看是否存在配置错误或 Broker 端的问题。

Kafka Consumer 性能优化与调优

  1. 批量拉取与处理
    • 为了提高 Consumer 的性能,可以增大每次拉取消息的批量大小。在 poll 方法中,可以通过设置合适的参数来控制批量大小。例如,增加 max.poll.records 配置项的值,该配置项表示每次 poll 方法最多返回的消息数量。
    • 代码示例:
props.put("max.poll.records", "1000");
  • 这样设置后,每次 poll 方法调用最多会返回 1000 条消息。批量拉取可以减少 Consumer 与 Broker 之间的网络交互次数,从而提高整体性能。同时,在处理消息时,也可以采用批量处理的方式。例如,在将消息写入数据库时,可以使用批量插入语句:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

// 在处理消息的循环中添加如下代码
try (Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "password")) {
    String sql = "INSERT INTO messages (key, value) VALUES (?,?)";
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
        for (ConsumerRecord<String, String> record : records) {
            pstmt.setString(1, record.key());
            pstmt.setString(2, record.value());
            pstmt.addBatch();
        }
        pstmt.executeBatch();
    }
} catch (SQLException e) {
    e.printStackTrace();
}
  • 通过批量插入,减少了数据库的写入操作次数,提高了数据写入效率。
  1. 合理设置线程数
    • 如果 Consumer 的处理逻辑比较复杂,单个线程处理消息可能会成为性能瓶颈。此时,可以考虑使用多线程来处理消息。在 Kafka Consumer 中,可以在 Consumer 实例外创建多个线程来处理 poll 方法返回的消息。
    • 例如,我们可以创建一个线程池来处理消息:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            executorService.submit(() -> {
                // 处理消息的逻辑
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            });
        }
    }
} finally {
    executorService.shutdown();
    consumer.close();
}
  • 在这段代码中,我们创建了一个固定大小为 10 的线程池,每个接收到的消息会提交到线程池中由不同的线程进行处理。这样可以充分利用多核 CPU 的性能,提高消息处理的速度。但需要注意的是,多线程处理也会带来一些问题,如线程安全问题、资源竞争等,需要开发者仔细处理。
  1. 优化网络配置
    • Kafka Consumer 与 Broker 之间的网络通信对性能有重要影响。可以通过调整一些网络相关的配置来优化性能。
    • socket.send.buffer.bytes 和 socket.receive.buffer.bytes:这两个配置项分别表示发送和接收缓冲区的大小。适当增大这两个值可以提高网络传输效率。例如:
props.put("socket.send.buffer.bytes", "102400");
props.put("socket.receive.buffer.bytes", "102400");
  • connections.max.idle.ms:该配置项表示连接在空闲状态下的最大存活时间。如果设置过小,可能会导致频繁的连接重建,影响性能;如果设置过大,可能会浪费资源。可以根据实际情况进行调整,例如设置为 30000(30 秒):
props.put("connections.max.idle.ms", "30000");
  • 通过优化网络配置,可以减少网络延迟和带宽消耗,提高 Kafka Consumer 的整体性能。

Kafka Consumer 故障处理与容错机制

  1. Consumer 实例故障
    • 当一个 Consumer 实例发生故障时,Kafka 的 Consumer Coordinator 会检测到该实例的心跳停止(Consumer 实例会定期向 Consumer Coordinator 发送心跳来表明自己处于活跃状态)。
    • 故障处理过程
      • Consumer Coordinator 会将故障的 Consumer 实例从 Consumer Group 中移除。
      • 然后,Consumer Coordinator 会触发再均衡,重新分配 Partition 给其他活跃的 Consumer 实例。
      • 例如,假设一个 Consumer Group 中有 3 个 Consumer 实例,其中一个实例由于内存溢出故障退出。Consumer Coordinator 会在检测到心跳停止后,将其移除,并重新为剩下的 2 个 Consumer 实例分配 Partition,以保证所有的 Partition 都能被正常消费。
    • 为了提高 Consumer 的容错能力,可以增加 Consumer Group 中的 Consumer 实例数量,形成一定的冗余。这样,当某个 Consumer 实例发生故障时,其他实例能够及时接管其负责的 Partition,减少数据处理中断的时间。
  2. 偏移量提交故障
    • 在提交偏移量时,可能会出现提交失败的情况。例如,网络故障、Broker 故障等都可能导致偏移量提交失败。
    • 处理偏移量提交故障
      • 如果使用自动提交偏移量,当提交失败时,Kafka 客户端会根据配置的重试次数进行重试。可以通过 retries 配置项来设置重试次数,默认值为 2147483647(Integer.MAX_VALUE)。例如:
props.put("retries", "3");
 - 如果使用手动提交偏移量,在 `commitSync` 方法提交失败时,会抛出异常,开发者可以在捕获异常后进行重试。例如:
boolean committed = false;
int retryCount = 0;
while (!committed && retryCount < 3) {
    try {
        consumer.commitSync();
        committed = true;
    } catch (CommitFailedException e) {
        retryCount++;
        System.err.println("Commit failed, retry attempt " + retryCount);
    }
}
 - 在这段代码中,我们手动提交偏移量,如果提交失败,会进行最多 3 次重试。通过合理处理偏移量提交故障,可以保证消息消费的准确性,避免消息的重复消费或漏消费。

3. 消息处理故障

  • 在处理消息时,可能会由于业务逻辑错误、外部服务调用失败等原因导致消息处理故障。
  • 处理消息处理故障
    • 可以采用重试机制。例如,当调用外部服务失败时,可以设置一定的重试次数和重试间隔。以调用 HTTP 服务为例:
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

int retryCount = 0;
boolean success = false;
while (!success && retryCount < 3) {
    try {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
              .uri(URI.create("http://example.com/api"))
              .build();
        HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
        if (response.statusCode() == 200) {
            success = true;
        }
    } catch (IOException | InterruptedException e) {
        retryCount++;
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }
}
 - 另外,对于一些无法通过重试解决的故障,可以将消息发送到一个专门的死信队列(Dead - Letter Queue,DLQ)中。在 Kafka 中,可以创建一个新的 Topic 作为 DLQ,当消息处理失败时,将消息发送到该 Topic 中,以便后续进行分析和处理。
KafkaProducer<String, String> dlqProducer = new KafkaProducer<>(dlqProps);
try {
    // 处理消息失败时
    ProducerRecord<String, String> dlqRecord = new ProducerRecord<>("dlq - topic", record.key(), record.value());
    dlqProducer.send(dlqRecord).get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
} finally {
    dlqProducer.close();
}
  • 通过这些措施,可以有效地处理消息处理过程中的故障,保证 Kafka Consumer 的稳定性和可靠性。