MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 架构与 Spring 集成实战

2023-02-203.7k 阅读

Kafka 架构概述

Kafka 是一种分布式流平台,最初由 LinkedIn 开发,后捐赠给 Apache 软件基金会。它设计用于处理大量实时数据,具有高吞吐量、低延迟、可扩展性和容错性等特性,被广泛应用于日志收集、消息传递、流处理等场景。

Kafka 核心组件

  1. Broker:Kafka 集群由多个 Broker 组成,每个 Broker 是一个 Kafka 服务器实例。它们负责接收生产者发送的消息,存储消息,并为消费者提供消息服务。Broker 通过 Zookeeper 进行协调和管理,Zookeeper 记录了 Kafka 集群的元数据信息,如主题(Topic)、分区(Partition)和副本(Replica)的分布等。
  2. Topic:主题是 Kafka 中消息的逻辑分类,类似数据库中的表。生产者将消息发送到特定的 Topic,消费者从 Topic 中订阅并获取消息。每个 Topic 可以被分为多个 Partition,以实现并行处理和提高吞吐量。
  3. Partition:分区是 Topic 的物理分片,每个 Topic 可以有一个或多个分区。分区分布在不同的 Broker 上,这样可以实现数据的分布式存储和并行处理。每个分区中的消息是有序的,这对于一些需要顺序处理消息的场景非常重要。例如,在处理订单数据时,保证同一订单的消息按顺序处理可以避免数据不一致问题。
  4. Replica:副本是分区的备份,用于保证数据的可靠性和容错性。每个分区可以有多个副本,其中一个副本被指定为 Leader,其他副本为 Follower。生产者发送的消息首先到达 Leader 副本,Follower 副本从 Leader 副本同步数据。当 Leader 副本所在的 Broker 发生故障时,会从 Follower 副本中选举出一个新的 Leader,以确保服务的可用性。
  5. Producer:生产者是向 Kafka 集群发送消息的客户端应用程序。生产者将消息发送到指定的 Topic,Kafka 支持多种消息发送模式,如同步发送、异步发送和批量发送等。生产者可以根据具体需求选择合适的发送模式,以平衡性能和可靠性。例如,在对性能要求较高且对消息可靠性要求相对较低的场景下,可以选择异步发送模式;而在对消息可靠性要求极高的场景下,应选择同步发送模式并确保消息成功发送。
  6. Consumer:消费者是从 Kafka 集群获取消息的客户端应用程序。消费者通过订阅 Topic 来获取消息,Kafka 支持两种消费模式:基于队列的消费模式和基于发布 - 订阅的消费模式。在基于队列的消费模式下,多个消费者组成一个消费组(Consumer Group),每个消费组内的消费者平均分配 Topic 的分区,以实现并行消费;在基于发布 - 订阅的消费模式下,每个消费者独立获取 Topic 的所有消息。

Kafka 架构原理

  1. 消息存储:Kafka 将消息以追加的方式写入磁盘,采用分段日志(Segmented Log)的方式进行管理。每个分区的日志被分为多个日志段(Log Segment),每个日志段由一个索引文件和一个数据文件组成。索引文件记录了消息在数据文件中的偏移量,通过这种方式可以快速定位消息。例如,当消费者需要获取某个偏移量的消息时,可以通过索引文件快速找到对应的消息在数据文件中的位置。这种基于磁盘的存储方式虽然看起来不如内存存储快,但由于 Kafka 采用了顺序写和零拷贝等技术,使得其在处理大量数据时具有很高的性能。
  2. 消息传递:生产者将消息发送到 Kafka 集群时,首先会根据 Topic 找到对应的分区。如果采用默认的分区策略(如轮询策略),生产者会将消息依次发送到各个分区。消息到达 Broker 后,会先写入 Leader 副本的日志文件,然后 Follower 副本从 Leader 副本同步数据。消费者从 Kafka 集群获取消息时,会向 Broker 发送拉取请求,Broker 根据消费者的请求返回相应的消息。消费者会记录自己消费的偏移量,以便下次从上次消费的位置继续消费。
  3. 高可用性:Kafka 通过多副本机制来实现高可用性。当某个 Broker 发生故障时,Zookeeper 会检测到并触发 Leader 选举。选举过程中,会从 Follower 副本中选择一个作为新的 Leader。由于 Follower 副本会不断从 Leader 副本同步数据,因此新的 Leader 可以继续提供服务,保证消息的正常读写。同时,Kafka 还支持数据的自动复制和恢复,确保数据不会丢失。

Spring 与 Kafka 集成基础

Spring 是一个广泛应用的 Java 开发框架,提供了丰富的功能和便捷的开发方式。Spring for Apache Kafka 是 Spring 框架对 Kafka 的集成支持,使得在 Spring 项目中使用 Kafka 变得更加简单和高效。

引入依赖

在 Spring 项目中集成 Kafka,首先需要在 pom.xml 文件中引入相关依赖。如果使用 Maven 构建项目,可以添加以下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.2</version>
</dependency>

上述依赖引入了 Spring for Apache Kafka 的核心库,版本号可以根据实际需求进行调整。同时,还需要确保项目中引入了 Kafka 客户端依赖,Spring for Apache Kafka 会自动依赖 Kafka 客户端库,但如果项目中有版本冲突等特殊情况,可能需要手动管理 Kafka 客户端依赖。

配置 Kafka 连接

在 Spring 项目中,通常通过配置文件来设置 Kafka 的连接信息。可以在 application.propertiesapplication.yml 文件中进行配置。以 application.yml 为例:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  • bootstrap-servers:指定 Kafka 集群的地址和端口,这里假设 Kafka 运行在本地的 9092 端口。如果是集群部署,需要列出多个 Broker 的地址和端口,以逗号分隔。
  • consumer.group-id:设置消费者组 ID,同一消费者组内的消费者会平均分配 Topic 的分区进行消费。不同消费者组的消费者会独立消费 Topic 的所有消息。
  • consumer.auto-offset-reset:指定消费者在没有初始偏移量或偏移量无效时的行为。earliest 表示从 Topic 的起始位置开始消费,latest 表示从 Topic 的最新位置开始消费。
  • consumer.key-deserializerconsumer.value-deserializer:指定消费者用于反序列化消息键和值的类。这里使用 StringDeserializer 表示消息的键和值都是字符串类型。
  • producer.key-serializerproducer.value-serializer:指定生产者用于序列化消息键和值的类。同样使用 StringSerializer 表示消息的键和值都是字符串类型。如果消息的键和值是自定义对象,需要自定义序列化和反序列化器。

Kafka 生产者集成实战

创建 Kafka 生产者配置类

在 Spring 项目中,可以通过创建配置类来定制 Kafka 生产者的配置。以下是一个简单的 Kafka 生产者配置类示例:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 可以添加更多配置,如acks、retries等
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上述配置类中:

  • producerFactory 方法创建了一个 ProducerFactory 对象,通过 configProps 配置了 Kafka 服务器地址、键和值的序列化器。ProducerFactory 用于创建 Kafka 生产者实例。
  • kafkaTemplate 方法创建了一个 KafkaTemplate 对象,它是 Spring 提供的用于发送消息到 Kafka 的模板类,依赖于 ProducerFactoryKafkaTemplate 封装了 Kafka 生产者的操作,提供了简单易用的发送消息方法。

发送消息

有了 Kafka 生产者配置后,就可以在服务类中使用 KafkaTemplate 发送消息。以下是一个简单的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "my-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

在上述代码中:

  • KafkaProducerService 是一个 Spring 服务类,通过 @Autowired 注入了 KafkaTemplate
  • sendMessage 方法接收一个字符串消息,并使用 kafkaTemplate.send 方法将消息发送到名为 my - topic 的 Topic 中。kafkaTemplate.send 方法有多个重载形式,可以指定消息的键、分区等信息。如果不指定键和分区,Kafka 会使用默认的分区策略将消息发送到某个分区。

Kafka 消费者集成实战

创建 Kafka 消费者配置类

与生产者类似,也需要创建一个 Kafka 消费者配置类来定制消费者的配置。以下是一个示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 可以设置并发数等其他属性
        return factory;
    }
}

在上述配置类中:

  • consumerFactory 方法创建了一个 ConsumerFactory 对象,通过 props 配置了 Kafka 服务器地址、消费者组 ID、偏移量重置策略以及键和值的反序列化器。ConsumerFactory 用于创建 Kafka 消费者实例。
  • kafkaListenerContainerFactory 方法创建了一个 ConcurrentKafkaListenerContainerFactory 对象,它是 Spring 用于创建并发消费者监听器容器的工厂类。通过设置 consumerFactory,该工厂类可以创建多个消费者实例来并行消费消息。可以通过该工厂类设置并发数等属性,以提高消费效率。

接收消息

配置好消费者后,就可以创建一个消费者监听器来接收消息。以下是一个示例:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void receive(String message) {
        System.out.println("Received message: " + message);
    }
}

在上述代码中:

  • KafkaConsumerListener 是一个 Spring 组件类,通过 @KafkaListener 注解标记了一个方法 receive@KafkaListener 注解指定了要监听的 Topic 为 my - topic,消费者组为 my - group。当有消息发送到 my - topic 且属于 my - group 消费者组时,receive 方法会被调用,参数 message 即为接收到的消息。在实际应用中,可以在 receive 方法中进行业务逻辑处理,如数据持久化、调用其他服务等。

自定义序列化与反序列化

在实际应用中,消息的键和值可能是自定义对象,而不是简单的字符串。这时需要自定义序列化和反序列化器。

自定义序列化器

假设我们有一个自定义的 User 类:

import java.io.Serializable;

public class User implements Serializable {
    private String name;
    private int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }
}

然后创建一个自定义的序列化器:

import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;

public class UserSerializer implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 可以在这里进行一些初始化配置
    }

    @Override
    public byte[] serialize(String topic, User data) {
        if (data == null) {
            return null;
        }
        byte[] nameBytes = data.getName().getBytes();
        int nameLength = nameBytes.length;
        ByteBuffer buffer = ByteBuffer.allocate(4 + nameLength);
        buffer.putInt(nameLength);
        buffer.put(nameBytes);
        buffer.putInt(data.getAge());
        return buffer.array();
    }

    @Override
    public void close() {
        // 关闭资源
    }
}

在上述 UserSerializer 中:

  • configure 方法用于初始化配置,可以在其中设置一些序列化相关的参数。
  • serialize 方法将 User 对象序列化为字节数组。首先将用户名的长度和用户名本身写入字节缓冲区,然后写入用户的年龄。
  • close 方法用于关闭资源,如关闭文件流等,在这个简单示例中暂时没有实际操作。

自定义反序列化器

与序列化器对应,创建一个自定义的反序列化器:

import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;

public class UserDeserializer implements Deserializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // 可以在这里进行一些初始化配置
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLength = buffer.getInt();
        byte[] nameBytes = new byte[nameLength];
        buffer.get(nameBytes);
        String name = new String(nameBytes);
        int age = buffer.getInt();
        return new User(name, age);
    }

    @Override
    public void close() {
        // 关闭资源
    }
}

在上述 UserDeserializer 中:

  • configure 方法同样用于初始化配置。
  • deserialize 方法将字节数组反序列化为 User 对象。首先从字节缓冲区中读取用户名的长度,然后读取用户名,最后读取用户的年龄并创建 User 对象。
  • close 方法用于关闭资源。

使用自定义序列化与反序列化器

在生产者和消费者配置中使用自定义的序列化和反序列化器。在生产者配置类中修改 producerFactory 方法:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<User, User> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<User, User> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在消费者配置类中修改 consumerFactory 方法:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<User, User> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<User, User> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<User, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上述配置修改后,生产者会使用 UserSerializerUser 对象序列化为字节数组发送到 Kafka,消费者会使用 UserDeserializer 将接收到的字节数组反序列化为 User 对象。这样就实现了自定义对象在 Kafka 中的消息传递。

事务与 Kafka 集成

在一些场景下,需要保证消息的发送和消费的原子性,这就需要使用 Kafka 的事务功能。Spring for Apache Kafka 提供了对 Kafka 事务的支持。

配置事务

首先在生产者配置类中启用事务支持:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        kafkaTemplate.setTransactionIdPrefix("my-transactional-id-");
        return kafkaTemplate;
    }

    @Bean
    public PlatformTransactionManager transactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }
}

在上述配置中:

  • producerFactory 方法中,通过 configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my - transactional - id"); 设置了事务 ID。事务 ID 必须是唯一的,用于标识一个事务。
  • kafkaTemplate 方法中,通过 kafkaTemplate.setTransactionIdPrefix("my - transactional - id - "); 设置了事务 ID 前缀。这个前缀与 ProducerConfig.TRANSACTIONAL_ID_CONFIG 中的值相关联。
  • transactionManager 方法创建了一个 KafkaTransactionManager 对象,它是 Spring 用于管理 Kafka 事务的事务管理器。

使用事务发送消息

在生产者服务类中使用事务发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "my-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Transactional
    public void sendMessage(String message) {
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send(TOPIC, message);
            // 可以在这里发送多条消息
            return true;
        });
    }
}

在上述代码中:

  • sendMessage 方法使用了 @Transactional 注解,表示该方法是一个事务性方法。
  • kafkaTemplate.executeInTransaction 方法在事务中执行消息发送操作。在这个方法的回调中,可以发送多条消息,这些消息要么全部成功发送,要么全部失败回滚,从而保证了消息发送的原子性。

事务与消费者

在消费者端,也可以结合事务来确保消息消费的一致性。例如,在消费消息后进行一些数据库操作,如果数据库操作失败,可以回滚事务,使得消息不会被标记为已消费,从而可以重新消费。这需要在消费者配置中进行一些设置,并在消费者监听器中处理事务。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class KafkaConsumerListener {

    @Autowired
    private SomeDatabaseService databaseService;

    @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaListenerContainerFactory")
    @Transactional
    public void receive(String message, Acknowledgment acknowledgment) {
        try {
            // 进行数据库操作
            databaseService.saveData(message);
            // 手动提交偏移量,只有在数据库操作成功后才提交
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 如果数据库操作失败,回滚事务,消息不会被标记为已消费
            throw new RuntimeException("Database operation failed", e);
        }
    }
}

在上述代码中:

  • KafkaConsumerListener 类中的 receive 方法使用了 @Transactional 注解。
  • 在方法中,首先进行数据库操作 databaseService.saveData(message),如果操作成功,通过 acknowledgment.acknowledge() 手动提交偏移量,标记消息为已消费;如果数据库操作失败,抛出异常,事务回滚,消息不会被标记为已消费,会被重新消费。

Kafka 分区与 Spring 集成

Kafka 的分区机制对于提高系统的性能和可扩展性非常重要。在 Spring 与 Kafka 集成中,可以对分区进行一些定制和操作。

自定义分区策略

假设我们有一个需求,根据消息的某个属性(如用户 ID)将消息发送到特定的分区。首先创建一个自定义的分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (key == null) {
            return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
        } else {
            // 假设key是用户ID,根据用户ID取模分配到特定分区
            String userId = (String) key;
            int userIdHash = Utils.toPositive(Utils.murmur2(userId.getBytes()));
            return userIdHash % numPartitions;
        }
    }

    @Override
    public void close() {
        // 关闭资源
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 配置分区器
    }
}

在上述 CustomPartitioner 中:

  • partition 方法根据消息的键(这里假设为用户 ID)计算哈希值,并通过取模操作将消息分配到特定的分区。如果键为 null,则根据消息的值计算哈希值进行分区。
  • close 方法用于关闭资源,在这个简单示例中暂时没有实际操作。
  • configure 方法用于配置分区器,可以在其中设置一些参数。

使用自定义分区器

在生产者配置类中使用自定义分区器:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上述配置中,通过 configProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class); 将自定义的分区器 CustomPartitioner 配置到生产者工厂中。这样,生产者在发送消息时会根据自定义的分区策略将消息发送到相应的分区。

消费者与分区

在消费者端,Spring 提供了一些方式来处理分区相关的操作。例如,可以在消费者监听器中获取当前消费的分区信息:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void receive(ConsumerRecord<String, String> record) {
        System.out.println("Received message from partition: " + record.partition());
        System.out.println("Message: " + record.value());
    }
}

在上述代码中,ConsumerRecord 对象包含了消息的各种元数据,通过 record.partition() 可以获取当前消息所在的分区。在实际应用中,可以根据分区信息进行一些特定的处理,如对不同分区的数据进行不同的统计分析等。

Kafka 监控与 Spring 集成

监控 Kafka 集群的运行状态对于保证系统的稳定性和性能非常重要。Spring Boot Actuator 可以与 Kafka 集成,提供一些监控指标。

引入监控依赖

pom.xml 文件中引入 Spring Boot Actuator 相关依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

同时,如果使用 Kafka 客户端的监控,还需要引入 Kafka 客户端的监控依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

配置监控

application.yml 文件中配置监控相关参数:

management:
  endpoints:
    web:
      exposure:
        include: '*'

上述配置允许暴露所有的 Actuator 端点,通过访问 /actuator 路径可以查看所有可用的监控端点。

Kafka 监控指标

Spring Boot Actuator 提供了一些与 Kafka 相关的监控指标,如:

  • kafka.consumer.records-per-second:消费者每秒消费的记录数。
  • kafka.producer.records-per-second:生产者每秒发送的记录数。
  • kafka.producer.request-latency:生产者请求的延迟时间。

可以通过访问 /actuator/metrics 端点查看所有的监控指标,或者通过 /actuator/metrics/{metric - name} 查看特定的指标,如 /actuator/metrics/kafka.consumer.records - per - second

此外,还可以结合 Grafana 和 Prometheus 等工具对 Kafka 的监控指标进行可视化展示和报警,以更好地监控 Kafka 集群的运行状态。

通过以上对 Kafka 架构与 Spring 集成的详细介绍和实战示例,希望能帮助读者深入理解并在实际项目中有效地应用 Kafka 和 Spring 进行后端开发,实现高效、可靠的消息传递和流处理系统。在实际应用中,还需要根据具体的业务需求和场景对 Kafka 和 Spring 的配置进行优化和调整,以达到最佳的性能和可靠性。