MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 集群搭建与配置要点

2024-10-021.5k 阅读

Kafka 集群搭建基础环境准备

在搭建 Kafka 集群之前,我们需要确保基础环境的配置满足要求。首先,Kafka 是基于 Java 运行的,所以必须安装 Java 运行环境(JRE)或者 Java 开发工具包(JDK)。一般建议安装 JDK,因为 Kafka 内部的一些管理脚本可能依赖 JDK 中的工具。

以 Ubuntu 系统为例,安装 OpenJDK 11 的命令如下:

sudo apt update
sudo apt install openjdk-11-jdk

安装完成后,可以通过以下命令检查 Java 版本:

java -version

确保输出类似于:

openjdk version "11.0.11" 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04)
OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)

其次,Kafka 依赖于 ZooKeeper 来管理集群元数据、选举领导者等。我们需要安装并配置 ZooKeeper。可以从 Apache ZooKeeper 官网下载安装包,例如下载 zookeeper-3.6.3.tar.gz

下载完成后,解压安装包:

tar -zxvf zookeeper-3.6.3.tar.gz

进入解压后的目录,复制配置文件模板并进行修改:

cd zookeeper-3.6.3
cp conf/zoo_sample.cfg conf/zoo.cfg

编辑 zoo.cfg 文件,配置数据存储目录和日志目录:

dataDir=/var/lib/zookeeper
dataLogDir=/var/log/zookeeper

如果是搭建 ZooKeeper 集群,还需要配置服务器列表。例如,假设我们有三个 ZooKeeper 节点,在 zoo.cfg 中添加如下配置:

server.1=zk1.example.com:2888:3888
server.2=zk2.example.com:2888:3888
server.3=zk3.example.com:2888:3888

这里的 123 是每个节点的唯一标识,需要在每个节点的 dataDir 目录下创建一个名为 myid 的文件,并在其中写入对应的标识号。例如,在 zk1.example.com 节点的 /var/lib/zookeeper/myid 文件中写入 1

启动 ZooKeeper 服务:

bin/zkServer.sh start

可以通过以下命令查看 ZooKeeper 服务状态:

bin/zkServer.sh status

Kafka 安装与配置文件解读

从 Apache Kafka 官网下载 Kafka 安装包,例如 kafka_2.13-2.8.0.tgz。下载完成后,解压安装包:

tar -zxvf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0

Kafka 的核心配置文件是 config/server.properties,下面详细解读一些重要的配置参数:

broker.id

每个 Kafka 代理(broker)都必须有一个唯一的标识符。在集群环境中,这个值不能重复。例如:

broker.id=0

如果是多节点集群,每个节点的 broker.id 依次递增,如 12 等。

listeners

listeners 配置 Kafka 代理监听的地址和端口。格式为 protocol://host:port。例如:

listeners=PLAINTEXT://:9092

这表示 Kafka 代理监听本地所有网卡的 9092 端口,使用 PLAINTEXT 协议。如果是在生产环境,建议指定具体的 IP 地址,例如:

listeners=PLAINTEXT://192.168.1.100:9092

如果需要配置多个监听地址,可以使用逗号分隔,例如:

listeners=PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093

advertised.listeners

advertised.listeners 用于告知生产者和消费者 Kafka 代理的地址。当 Kafka 代理部署在云环境或者网络地址转换(NAT)后面时,这个配置非常重要。例如:

advertised.listeners=PLAINTEXT://kafka.example.com:9092

这里的 kafka.example.com 是可以被外部访问的主机名或 IP 地址。

zookeeper.connect

配置 Kafka 连接 ZooKeeper 的地址和端口。例如:

zookeeper.connect=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181

如果 ZooKeeper 配置了 ACL 认证,还需要在后面添加认证信息,例如:

zookeeper.connect=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181?zookeeper.sasl.client=true&zookeeper.sasl.login.callback.handler.class=com.example.CustomSaslLoginCallbackHandler

log.dirs

配置 Kafka 日志存储的目录。可以指定多个目录,以逗号分隔,Kafka 会在这些目录间均匀分配日志文件。例如:

log.dirs=/var/lib/kafka-logs

num.partitions

创建主题(topic)时默认的分区数。例如:

num.partitions=3

这个值可以根据实际的负载和性能需求进行调整。

Kafka 集群搭建步骤

假设我们要搭建一个包含三个 Kafka 节点的集群,节点分别为 kafka1.example.comkafka2.example.comkafka3.example.com

  1. 在每个节点上重复安装 Kafka 并配置 按照前面的步骤在每个节点上下载、解压 Kafka 安装包,并修改 config/server.properties 文件。

kafka1.example.com 上,config/server.properties 配置如下:

broker.id=0
listeners=PLAINTEXT://kafka1.example.com:9092
advertised.listeners=PLAINTEXT://kafka1.example.com:9092
zookeeper.connect=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
log.dirs=/var/lib/kafka-logs-0

kafka2.example.com 上,config/server.properties 配置如下:

broker.id=1
listeners=PLAINTEXT://kafka2.example.com:9092
advertised.listeners=PLAINTEXT://kafka2.example.com:9092
zookeeper.connect=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
log.dirs=/var/lib/kafka-logs-1

kafka3.example.com 上,config/server.properties 配置如下:

broker.id=2
listeners=PLAINTEXT://kafka3.example.com:9092
advertised.listeners=PLAINTEXT://kafka3.example.com:9092
zookeeper.connect=zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181
log.dirs=/var/lib/kafka-logs-2
  1. 启动 Kafka 服务 在每个节点上启动 Kafka 服务:
bin/kafka-server-start.sh config/server.properties &

启动后,可以通过查看日志文件(默认在 logs/server.log)来确认 Kafka 是否正常启动。例如,日志中应该出现类似如下的信息:

[2023-01-01 12:00:00,000] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
  1. 验证 Kafka 集群 可以使用 Kafka 自带的命令行工具来验证集群是否正常工作。例如,创建一个主题:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092 --partitions 3 --replication-factor 3

这里创建了一个名为 test-topic 的主题,包含 3 个分区,每个分区有 3 个副本。

可以通过以下命令查看主题列表:

bin/kafka-topics.sh --list --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092

应该能看到刚刚创建的 test-topic

接着,可以使用生产者向主题发送消息:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092

在控制台输入消息,例如 Hello, Kafka!,然后按回车键发送。

再使用消费者从主题接收消息:

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092 --from-beginning

应该能看到刚刚生产者发送的消息 Hello, Kafka!

Kafka 集群安全配置要点

在生产环境中,Kafka 集群的安全配置至关重要。以下介绍一些常见的安全配置要点。

身份认证

Kafka 支持多种身份认证机制,如 SASL(Simple Authentication and Security Layer)。以 SASL - PLAIN 为例,配置步骤如下:

  1. 配置 Kafka 代理config/server.properties 文件中添加如下配置:
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://kafka.example.com:9092
security.inter.broker.protocol=SASL_PLAINTEXT
  1. 创建用户和密码 可以使用 kafka-configs.sh 工具创建用户和密码。首先,创建一个包含用户名和密码的文件 users.properties,格式为 username=password,例如:
admin=admin-secret
producer=producer-secret
consumer=consumer-secret

然后使用以下命令创建用户:

bin/kafka-configs.sh --zookeeper zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]' --entity-type users --entity-name admin
bin/kafka-configs.sh --zookeeper zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 --alter --add-config 'SCRAM-SHA-256=[password=producer-secret]' --entity-type users --entity-name producer
bin/kafka-configs.sh --zookeeper zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 --alter --add-config 'SCRAM-SHA-256=[password=consumer-secret]' --entity-type users --entity-name consumer
  1. 配置客户端 如果使用 Java 客户端,在生产者和消费者的配置中添加如下认证信息:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092");
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producer-secret\";");

数据加密

Kafka 支持数据在传输过程中的加密,通过 SSL/TLS 协议实现。

  1. 生成证书和密钥 可以使用 keytool 工具生成证书和密钥。例如,生成服务器端证书和密钥:
keytool -keystore kafka.server.keystore.jks -alias kafka -validity 365 -genkey -keyalg RSA

按照提示输入密码、组织信息等。

生成客户端证书和密钥:

keytool -keystore kafka.client.keystore.jks -alias client -validity 365 -genkey -keyalg RSA
  1. 配置 Kafka 代理config/server.properties 文件中添加如下配置:
listeners=SSL://:9093
advertised.listeners=SSL://kafka.example.com:9093
ssl.keystore.location=/path/to/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/path/to/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
  1. 配置客户端 在 Java 客户端配置中添加如下信息:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1.example.com:9093,kafka2.example.com:9093,kafka3.example.com:9093");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "/path/to/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "keystore-password");
props.put("ssl.key.password", "key-password");
props.put("ssl.truststore.location", "/path/to/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "truststore-password");

Kafka 集群性能优化要点

硬件资源优化

  1. CPU Kafka 是一个高并发的系统,对 CPU 性能要求较高。尽量使用多核 CPU,并且确保 CPU 频率稳定。可以通过调整系统的 CPU 调度策略,例如在 Linux 系统中,可以使用 cpupower 工具将 CPU 频率设置为性能模式:
sudo cpupower frequency-set -g performance
  1. 内存 Kafka 的缓存机制依赖内存,合理分配内存对于性能提升非常重要。log.buffer.size 配置了每个分区日志缓存的大小,默认值为 32MB。可以根据实际情况适当增大,例如:
log.buffer.size=64MB

同时,socket.send.buffer.bytessocket.receive.buffer.bytes 分别配置了网络发送和接收缓冲区的大小,也可以适当调整,例如:

socket.send.buffer.bytes=128KB
socket.receive.buffer.bytes=128KB
  1. 磁盘 Kafka 的日志存储在磁盘上,使用高性能的磁盘对于提升写入和读取性能至关重要。建议使用固态硬盘(SSD),并且配置多个磁盘用于 log.dirs,以分散 I/O 负载。同时,调整 log.flush.interval.messageslog.flush.scheduler.interval.ms 等参数,控制日志刷盘的频率。例如:
log.flush.interval.messages=10000
log.flush.scheduler.interval.ms=1000

配置参数优化

  1. 分区和副本 合理设置主题的分区数和副本数对于性能和可靠性都有影响。分区数过多会增加管理开销,过少则会影响并发性能。一般根据预估的负载和 CPU 核心数来确定分区数。例如,对于一个 CPU 为 8 核的服务器,每个分区的合理负载假设为 1000 条消息/秒,如果预估总负载为 8000 条消息/秒,那么可以设置分区数为 8。

副本数的设置要考虑数据的可靠性和存储成本。如果对数据可靠性要求极高,可以设置副本数为 3 或更多,但同时会增加存储开销。

  1. 生产者和消费者配置 在生产者端,acks 参数控制生产者在等待服务器确认之前发送的消息数量。设置 acks=all 可以确保消息的可靠性,但会降低性能;设置 acks=1 可以提高性能,但可能会丢失少量消息。例如:
props.put("acks", "all");

在消费者端,fetch.min.bytesfetch.max.wait.ms 控制消费者每次拉取消息的最小字节数和最大等待时间。可以根据实际情况调整,例如:

props.put("fetch.min.bytes", "1024");
props.put("fetch.max.wait.ms", "500");

Kafka 集群监控与维护

监控指标

  1. 代理指标

    • CPU 使用率:可以通过操作系统的监控工具(如 tophtop)或者 Kafka 自带的 JMX 指标来监控 Kafka 代理的 CPU 使用率。高 CPU 使用率可能表示 Kafka 处理消息的负载过高,需要优化配置或者增加硬件资源。
    • 内存使用率:同样可以通过操作系统工具或者 JMX 指标监控 Kafka 代理的内存使用情况。如果内存使用率持续过高,可能需要调整 Kafka 的内存相关配置,如 log.buffer.size 等。
    • 网络 I/O:使用 ifstat 等工具监控 Kafka 代理的网络输入输出流量。过高的网络流量可能导致消息传输延迟,需要检查网络带宽是否足够。
    • 磁盘 I/O:通过 iostat 等工具监控 Kafka 代理的磁盘 I/O 情况。如果磁盘读写速度过慢,可能影响消息的持久化和读取,需要优化磁盘配置或者更换高性能磁盘。
  2. 主题指标

    • 消息堆积量:可以通过 Kafka 自带的命令行工具或者第三方监控工具(如 Kafka Manager、Prometheus + Grafana)来监控主题的消息堆积量。如果某个主题的消息堆积量持续增加,可能表示消费者处理消息的速度过慢,需要优化消费者代码或者增加消费者实例。
    • 分区 Leader 分布:使用 kafka-topics.sh 工具的 --describe 选项可以查看主题分区的 Leader 分布情况。确保 Leader 均匀分布在各个代理上,避免某个代理成为热点。

维护操作

  1. 主题管理
    • 创建主题:如前面所述,使用 kafka-topics.sh 工具创建主题时,可以指定分区数、副本数等参数。例如,创建一个名为 new - topic 的主题,包含 5 个分区,副本数为 2:
bin/kafka-topics.sh --create --topic new - topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092 --partitions 5 --replication-factor 2
  • 修改主题:可以使用 kafka-topics.sh 工具修改主题的分区数(注意,只能增加分区数,不能减少)。例如,将 new - topic 的分区数增加到 8:
bin/kafka-topics.sh --alter --topic new - topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092 --partitions 8
  • 删除主题:要删除主题,首先需要在 config/server.properties 文件中设置 delete.topic.enable=true,然后使用以下命令删除主题:
bin/kafka-topics.sh --delete --topic new - topic --bootstrap-server kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092
  1. 代理管理
    • 添加代理:如果需要扩展 Kafka 集群,添加新的代理节点。首先在新节点上安装 Kafka 并按照集群配置要求修改 config/server.properties 文件,确保 broker.id 唯一且其他配置与现有集群一致。然后启动新的 Kafka 代理,ZooKeeper 会自动将其加入集群。
    • 删除代理:删除代理时,需要先将该代理上的分区 Leader 迁移到其他代理上。可以使用 kafka - preferred - replica - election.sh 工具进行迁移。例如,假设要删除 broker.id=3 的代理:
bin/kafka - preferred - replica - election.sh --zookeeper zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181 --broker-list 3

迁移完成后,停止该代理的 Kafka 服务。

  1. 数据备份与恢复
    • 数据备份:Kafka 的数据备份可以通过定期复制 log.dirs 目录下的日志文件来实现。可以使用脚本结合 rsync 等工具将日志文件备份到其他存储设备。例如,以下脚本可以定期将 Kafka 日志备份到远程服务器:
#!/bin/bash
LOG_DIR=/var/lib/kafka - logs
BACKUP_SERVER=backup.example.com
BACKUP_DIR=/backup/kafka - logs
rsync -avz $LOG_DIR/ $BACKUP_SERVER:$BACKUP_DIR/
  • 数据恢复:如果需要恢复数据,将备份的日志文件复制回 log.dirs 目录,并确保文件权限正确。然后重启 Kafka 服务,Kafka 会自动识别并加载这些日志文件。

通过以上对 Kafka 集群搭建、配置要点、安全配置、性能优化以及监控维护等方面的详细介绍,希望能帮助读者搭建一个稳定、高效、安全的 Kafka 集群,满足各种后端开发场景下的消息队列需求。在实际应用中,还需要根据具体的业务需求和环境特点进行进一步的优化和调整。