RocketMQ 集群搭建与配置优化
RocketMQ 集群搭建
RocketMQ 是一款分布式消息中间件,具有高吞吐量、高可用性等特点。在实际生产环境中,为了保证消息的可靠处理和系统的高可用性,通常需要搭建 RocketMQ 集群。
环境准备
在搭建 RocketMQ 集群之前,需要确保以下环境准备工作:
- 操作系统:推荐使用 Linux 系统,本文以 CentOS 7 为例进行演示。
- JDK:RocketMQ 是基于 Java 开发的,需要安装 JDK 1.8 及以上版本。可以通过以下命令检查 JDK 是否安装并配置正确:
java -version
如果没有安装 JDK,可以从 Oracle 官网下载并按照官方文档进行安装配置。
3. 下载 RocketMQ:从 RocketMQ 官方 GitHub 仓库(https://github.com/apache/rocketmq/releases)下载所需版本的 RocketMQ 二进制包。例如,下载 rocketmq-all-4.9.4-bin-release.zip
,然后解压到指定目录,假设解压到 /opt/rocketmq
:
unzip rocketmq-all-4.9.4-bin-release.zip -d /opt/rocketmq
单 Master 模式集群搭建
单 Master 模式是 RocketMQ 最简单的集群模式,只有一个 Master 节点,适合开发测试环境。
- 配置 Master 节点
进入 RocketMQ 解压目录,编辑
conf/broker.conf
文件,添加以下配置:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876 # 替换为实际的 NameServer 地址
上述配置中:
brokerClusterName
定义了集群名称,这里设置为DefaultCluster
。brokerName
定义了 Broker 名称,这里为broker-a
。brokerId
为 Broker 的编号,0 表示 Master 节点。deleteWhen
表示每天删除过期文件的时间,04 表示凌晨 4 点。fileReservedTime
表示文件保留时间,单位为小时,这里设置为 48 小时。brokerRole
定义了 Broker 的角色,ASYNC_MASTER
表示异步 Master。flushDiskType
定义了刷盘策略,ASYNC_FLUSH
表示异步刷盘。namesrvAddr
定义了 NameServer 的地址和端口。
- 启动 NameServer 在 RocketMQ 解压目录下执行以下命令启动 NameServer:
nohup sh bin/mqnamesrv &
启动后,可以通过查看 logs/namesrv.log
文件来确认 NameServer 是否正常启动:
tail -f logs/namesrv.log
如果看到类似 The Name Server boot success. serializeType=JSON
的日志,表示 NameServer 启动成功。
- 启动 Broker 在 RocketMQ 解压目录下执行以下命令启动 Broker:
nohup sh bin/mqbroker -c conf/broker.conf &
同样,通过查看 logs/broker.log
文件来确认 Broker 是否正常启动:
tail -f logs/broker.log
如果看到类似 The broker[broker-a, 192.168.1.101:10911] boot success. serializeType=JSON and name server is 192.168.1.100:9876
的日志,表示 Broker 启动成功。
多 Master 模式集群搭建
多 Master 模式下,集群中有多个 Master 节点,每个 Master 节点都可以读写消息,提高了系统的可用性和吞吐量。
- 配置 Master 节点
假设要搭建两个 Master 节点,分别为
broker-a
和broker-b
。在 RocketMQ 解压目录下创建两个配置文件conf/broker-a.conf
和conf/broker-b.conf
。
conf/broker-a.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876 # 多个 NameServer 地址,用逗号分隔
listenPort = 10911
conf/broker-b.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876
listenPort = 10921
注意:每个 Master 节点的 listenPort
要设置为不同的值,避免端口冲突。
- 启动 NameServer 在每个 NameServer 所在的服务器上执行以下命令启动 NameServer:
nohup sh bin/mqnamesrv &
分别在两个 NameServer 的日志文件 logs/namesrv.log
中确认启动成功。
- 启动 Broker
在
broker-a
所在的服务器上执行以下命令启动broker-a
:
nohup sh bin/mqbroker -c conf/broker-a.conf &
在 broker-b
所在的服务器上执行以下命令启动 broker-b
:
nohup sh bin/mqbroker -c conf/broker-b.conf &
分别查看 broker-a
和 broker-b
的日志文件 logs/broker.log
确认启动成功。
多 Master 多 Slave 模式集群搭建
多 Master 多 Slave 模式在多 Master 模式的基础上,为每个 Master 节点配置了 Slave 节点,进一步提高了系统的可用性。当 Master 节点出现故障时,Slave 节点可以切换为 Master 节点继续提供服务。
- 配置 Master 节点
以两个 Master 节点
broker-a
和broker-b
为例,配置文件与多 Master 模式类似,但需要注意brokerRole
和flushDiskType
等配置。
conf/broker-a.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = SYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876
listenPort = 10911
conf/broker-b.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = SYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876
listenPort = 10921
这里将 brokerRole
设置为 SYNC_MASTER
,flushDiskType
设置为 SYNC_FLUSH
,表示同步 Master 和同步刷盘,以保证数据的可靠性。
- 配置 Slave 节点
为每个 Master 节点配置一个 Slave 节点,分别创建
conf/broker-a-s.conf
和conf/broker-b-s.conf
。
conf/broker-a-s.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876
listenPort = 11011
conf/broker-b-s.conf
配置如下:
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876,192.168.1.101:9876
listenPort = 11021
注意:Slave 节点的 brokerId
要与对应的 Master 节点不同,且 listenPort
也要设置为不同的值。
-
启动 NameServer 与多 Master 模式相同,在每个 NameServer 所在的服务器上启动 NameServer。
-
启动 Master 和 Slave 节点 按照顺序分别启动 Master 和 Slave 节点。在
broker-a
所在的服务器上启动broker-a
:
nohup sh bin/mqbroker -c conf/broker-a.conf &
在 broker-a
的 Slave 节点所在服务器上启动 broker-a-s
:
nohup sh bin/mqbroker -c conf/broker-a-s.conf &
同理,在 broker-b
及其 Slave 节点所在服务器上启动相应的 Broker。通过查看各个节点的日志文件确认启动成功。
RocketMQ 配置优化
在搭建好 RocketMQ 集群后,为了让其在生产环境中更好地运行,需要对一些关键配置进行优化。
内存配置优化
RocketMQ 的 Broker 进程在处理消息时需要占用大量内存,合理配置内存可以提高性能和稳定性。
- 调整堆内存大小
在启动脚本
bin/runbroker.sh
中,可以调整堆内存大小。例如,将初始堆内存和最大堆内存都设置为 4G:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-Xms4g
设置初始堆内存为 4G。-Xmx4g
设置最大堆内存为 4G。-Xmn2g
设置新生代大小为 2G,老生代大小为 2G(4G - 2G)。-XX:MetaspaceSize=128m
设置元空间初始大小为 128M。-XX:MaxMetaspaceSize=320m
设置元空间最大大小为 320M。
- 直接内存配置
RocketMQ 使用直接内存(Direct Memory)来提高 I/O 性能。在
conf/broker.conf
中可以配置直接内存大小:
mapedFileSizeCommitLog = 1073741824 # 1G,CommitLog 文件大小
mapedFileSizeConsumeQueue = 30000000 # 约 28.6M,ConsumeQueue 文件大小
mapedFileSizeCommitLog
定义了 CommitLog 文件的大小,建议根据实际消息量和磁盘空间进行调整。如果消息量较大,可以适当增大该值。mapedFileSizeConsumeQueue
定义了 ConsumeQueue 文件的大小,一般不需要过大调整。
网络配置优化
- TCP 参数调整
在 Linux 系统中,可以通过修改
/etc/sysctl.conf
文件来调整 TCP 参数,提高网络性能。例如:
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
net.ipv4.tcp_fin_timeout = 30
net.core.rmem_max
和net.core.wmem_max
设置接收和发送缓冲区的最大大小。net.ipv4.tcp_rmem
和net.ipv4.tcp_wmem
设置 TCP 接收和发送缓冲区的动态调整范围。net.ipv4.tcp_syncookies
开启 SYN Cookies,防止 SYN Flood 攻击。net.ipv4.tcp_tw_reuse
允许重用处于 TIME-WAIT 状态的 socket。net.ipv4.tcp_tw_recycle
关闭 TCP 连接快速回收,避免在 NAT 环境下出现问题。net.ipv4.tcp_fin_timeout
设置 FIN-WAIT-2 状态的超时时间。
修改完成后,执行以下命令使配置生效:
sysctl -p
- 优化 NameServer 网络 NameServer 作为 RocketMQ 集群的元数据管理中心,其网络性能也很关键。可以通过增加 NameServer 的网络带宽、优化网络拓扑等方式来提高其性能。同时,建议将 NameServer 部署在高性能的服务器上,并配置多个 NameServer 节点以提高可用性。
刷盘策略优化
RocketMQ 支持同步刷盘和异步刷盘两种策略,不同的策略对性能和数据可靠性有不同的影响。
- 同步刷盘
同步刷盘策略下,Broker 在接收到消息后,会将消息写入磁盘并确保写入成功后才返回成功响应给生产者。这种策略保证了数据的可靠性,但性能相对较低。在
conf/broker.conf
中设置同步刷盘:
brokerRole = SYNC_MASTER
flushDiskType = SYNC_FLUSH
- 异步刷盘
异步刷盘策略下,Broker 在接收到消息后,会先将消息写入内存缓冲区,然后由后台线程异步将缓冲区中的消息写入磁盘。这种策略性能较高,但在系统故障时可能会丢失部分未刷盘的消息。在
conf/broker.conf
中设置异步刷盘:
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
在实际生产环境中,需要根据业务对数据可靠性和性能的要求来选择合适的刷盘策略。如果对数据可靠性要求极高,如金融业务,建议使用同步刷盘;如果对性能要求较高,且能容忍一定程度的数据丢失,可以使用异步刷盘。
消息存储优化
-
选择合适的存储介质 RocketMQ 的消息存储依赖于磁盘,选择高性能的存储介质可以显著提高消息存储和读取的性能。例如,使用 SSD 硬盘代替传统的机械硬盘,SSD 具有更快的读写速度,可以减少 I/O 延迟。
-
优化存储目录布局 合理规划 RocketMQ 的存储目录布局可以提高 I/O 性能。可以将 CommitLog、ConsumeQueue 和 IndexFile 等存储文件分别放在不同的磁盘分区上,避免 I/O 竞争。例如:
storePathRootDir = /data/rocketmq/store
storePathCommitLog = /data/rocketmq/commitlog
storePathConsumeQueue = /data/rocketmq/consumequeue
storePathIndex = /data/rocketmq/index
在 conf/broker.conf
中通过上述配置指定不同的存储路径。
RocketMQ 代码示例
以下是使用 Java 语言通过 RocketMQ 客户端进行消息发送和接收的代码示例。
引入依赖
在 pom.xml
文件中引入 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
消息发送示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息
Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
上述代码创建了一个 RocketMQ 生产者实例,设置了 NameServer 地址并启动。然后循环发送 10 条消息到名为 TopicTest
的主题中,最后关闭生产者。
消息接收示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置 NameServer 地址
consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
// 订阅主题和标签
consumer.subscribe("TopicTest", "TagA");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Received message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer started.");
}
}
上述代码创建了一个 RocketMQ 消费者实例,设置了 NameServer 地址并订阅了 TopicTest
主题下的 TagA
标签。然后注册了一个消息监听器,在监听器中处理接收到的消息,最后启动消费者。
通过以上代码示例,可以了解如何使用 RocketMQ 客户端进行简单的消息发送和接收操作。在实际应用中,还需要根据业务需求进行更复杂的配置和处理,如事务消息、顺序消息等。同时,结合前面介绍的集群搭建和配置优化知识,可以构建一个高性能、高可用的 RocketMQ 消息系统。