MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ安装与集群搭建实战

2023-10-036.3k 阅读

一、环境准备

在开始安装和搭建 RocketMQ 集群之前,我们需要确保一些基础环境的准备。

1.1 操作系统

RocketMQ 支持多种操作系统,包括 Linux、Windows 等。在生产环境中,Linux 是较为常用的选择。本文以 CentOS 7 为例进行讲解。 确保系统已安装并配置好网络,更新系统软件包到最新版本,执行以下命令:

yum update

1.2 JDK 安装

RocketMQ 是基于 Java 开发的,所以需要安装 Java Development Kit(JDK)。可以从 Oracle 官网下载 JDK 安装包,也可以使用系统自带的包管理器进行安装。以 CentOS 7 为例,通过 yum 安装 OpenJDK 1.8:

yum install java-1.8.0-openjdk-devel

安装完成后,配置 JAVA_HOME 环境变量。编辑 /etc/profile 文件,添加以下内容:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.312.b07-1.el7_9.x86_64
export PATH=$JAVA_HOME/bin:$PATH

然后执行 source /etc/profile 使配置生效。通过 java -version 命令验证 JDK 是否安装成功,应显示类似如下信息:

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)

1.3 下载 RocketMQ

访问 RocketMQ 官方 GitHub 仓库(https://github.com/apache/rocketmq/releases),下载适合的版本。截至目前,最新版本为 5.1.0。以该版本为例,在 Linux 系统上使用 wget 命令下载:

wget https://archive.apache.org/dist/rocketmq/5.1.0/rocketmq-all-5.1.0-bin-release.zip

下载完成后,解压压缩包:

unzip rocketmq-all-5.1.0-bin-release.zip
cd rocketmq-all-5.1.0-bin-release

二、单机安装 RocketMQ

在搭建集群之前,先进行单机安装,以便熟悉 RocketMQ 的基本使用。

2.1 启动 NameServer

RocketMQ 的 NameServer 是一个轻量级的元数据服务器,负责存储 Topic、Broker 等元数据信息。进入 RocketMQ 解压目录,执行以下命令启动 NameServer:

nohup sh bin/mqnamesrv &

上述命令使用 nohup 使进程在后台运行,即使终端关闭也不会影响其运行。启动成功后,可以在 logs/namesrv.log 文件中查看日志信息,确认 NameServer 已正常启动,日志中会显示类似如下信息:

The Name Server boot success. serializeType=JSON

2.2 启动 Broker

Broker 是 RocketMQ 的核心组件,负责接收、存储和转发消息。在启动 Broker 之前,需要根据实际情况修改配置文件。进入 conf 目录,有多个配置文件示例,这里以 broker.conf 为例。 编辑 broker.conf 文件,添加或修改以下关键配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 127.0.0.1:9876

其中,brokerClusterName 表示集群名称,brokerName 为 Broker 的名称,brokerId 为 Broker 的 ID,0 表示 Master,大于 0 表示 Slave。deleteWhen 表示删除文件的时间,fileReservedTime 表示文件保留时间(单位为小时)。brokerRole 表示 Broker 的角色,ASYNC_MASTER 为异步 Master,SYNC_MASTER 为同步 Master,SLAVE 为 Slave。flushDiskType 表示刷盘策略,ASYNC_FLUSH 为异步刷盘,SYNC_FLUSH 为同步刷盘。namesrvAddr 为 NameServer 的地址和端口。

修改完配置后,执行以下命令启动 Broker:

nohup sh bin/mqbroker -c conf/broker.conf &

启动成功后,在 logs/broker.log 文件中查看日志,确认 Broker 已正常启动,日志中会显示类似如下信息:

The broker[broker-a, 172.30.30.234:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

三、RocketMQ 集群搭建

RocketMQ 集群搭建有多种模式,这里以一主一从异步复制模式为例进行讲解。

3.1 规划集群节点

假设我们有两台服务器,IP 分别为 192.168.1.100192.168.1.101。在 192.168.1.100 上部署 Master Broker 和 NameServer,在 192.168.1.101 上部署 Slave Broker 和 NameServer。

3.2 配置 NameServer

在两台服务器上分别下载并解压 RocketMQ,进入解压目录。在 192.168.1.100192.168.1.101 上分别启动 NameServer:

nohup sh bin/mqnamesrv &

启动成功后,在两台服务器的 logs/namesrv.log 文件中确认 NameServer 已正常启动。

3.3 配置 Master Broker

192.168.1.100 的 RocketMQ 解压目录下,编辑 conf/broker.conf 文件,添加或修改以下配置:

brokerClusterName = ClusterOne
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.101:9876

注意,namesrvAddr 配置了两台 NameServer 的地址。修改完成后,启动 Master Broker:

nohup sh bin/mqbroker -c conf/broker.conf &

3.4 配置 Slave Broker

192.168.1.101 的 RocketMQ 解压目录下,编辑 conf/broker.conf 文件,添加或修改以下配置:

brokerClusterName = ClusterOne
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.101:9876

这里 brokerId 为 1,表示 Slave Broker。配置完成后,启动 Slave Broker:

nohup sh bin/mqbroker -c conf/broker.conf &

启动成功后,在两台服务器的 logs/broker.log 文件中确认 Master 和 Slave Broker 已正常启动并完成同步。

四、RocketMQ 生产者和消费者代码示例

为了验证 RocketMQ 集群搭建成功,并且展示如何使用 RocketMQ 进行消息的生产和消费,下面给出基于 Java 的代码示例。

4.1 引入依赖

在 Maven 项目的 pom.xml 文件中引入 RocketMQ 客户端依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.0</version>
</dependency>

4.2 生产者代码

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例
            Message message = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println("SendResult: " + sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

4.3 消费者代码

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.101:9876");
        // 订阅主题
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("Consumer started.");
    }
}

五、常见问题及解决方法

在安装和搭建 RocketMQ 集群过程中,可能会遇到一些问题,下面列举一些常见问题及解决方法。

5.1 Broker 启动失败

  • 问题描述:执行 mqbroker 命令后,Broker 未能正常启动,broker.log 中显示错误信息。
  • 解决方法
    • 检查配置文件 broker.conf 是否正确,特别是 namesrvAddrbrokerId 等关键配置。
    • 确保系统资源充足,如内存、磁盘空间等。RocketMQ 对内存要求较高,默认启动参数可能需要根据实际情况调整。可以通过修改 bin/runbroker.sh 文件中的 JAVA_OPT 来调整内存参数。
    • 检查防火墙设置,确保 NameServer 和 Broker 之间的通信端口(9876、10911、10909 等)没有被防火墙阻挡。可以通过 iptables -L 命令查看防火墙规则,或者临时关闭防火墙进行测试:systemctl stop firewalld

5.2 生产者发送消息失败

  • 问题描述:生产者代码运行后,控制台输出发送失败的错误信息。
  • 解决方法
    • 确认生产者配置的 namesrvAddr 与集群实际的 NameServer 地址一致。
    • 检查 Topic 是否存在。可以通过 RocketMQ 自带的工具或者管理控制台来创建 Topic。
    • 检查生产者所在机器与 RocketMQ 集群的网络连通性,确保能够正常通信。

5.3 消费者接收不到消息

  • 问题描述:消费者启动后,没有接收到生产者发送的消息。
  • 解决方法
    • 确认消费者配置的 namesrvAddr 和订阅的 Topic 与生产者一致。
    • 检查消费者的消费模式(如集群消费、广播消费)是否符合需求。如果是集群消费,确保消费者组内的实例分布在不同的机器上,避免重复消费。
    • 查看 Broker 日志,确认消息是否已成功存储。如果消息未成功存储,可能是 Broker 配置或者磁盘等问题导致。

六、优化与调优

为了使 RocketMQ 集群在生产环境中高效稳定运行,需要进行一些优化与调优。

6.1 内存优化

RocketMQ 的 Broker 进程对内存要求较高,合理配置内存参数可以提高性能。在 bin/runbroker.sh 文件中,可以修改 JAVA_OPT 来调整堆内存大小。例如,增加堆内存到 4GB:

JAVA_OPT="${JAVA_OPT} -Xms4g -Xmx4g -Xmn2g"

-Xms 表示初始堆大小,-Xmx 表示最大堆大小,-Xmn 表示新生代大小。根据实际情况合理调整这些参数,可以减少垃圾回收次数,提高系统性能。

6.2 刷盘策略优化

RocketMQ 支持同步刷盘和异步刷盘两种策略。同步刷盘保证数据的可靠性,但性能相对较低;异步刷盘性能较高,但在系统故障时可能会丢失少量数据。在生产环境中,需要根据业务对数据可靠性的要求来选择合适的刷盘策略。 如果选择异步刷盘,可以通过调整 broker.conf 中的 flushDiskTypeASYNC_FLUSH。同时,可以适当调整 flushIntervalDisk(刷盘间隔时间,单位为毫秒)和 commitLogDispatcherBuildDispatchResourceMaxAwaitTime(刷盘线程等待资源的最大时间,单位为毫秒)等参数来优化性能。

6.3 网络优化

为了减少网络延迟和提高网络吞吐量,可以进行以下网络优化:

  • 调整 TCP 参数,如 tcp_tw_reusetcp_tw_recycletcp_fin_timeout 等,以加快 TCP 连接的回收和重用。可以通过修改 /etc/sysctl.conf 文件并执行 sysctl -p 使配置生效。
  • 确保服务器之间的网络带宽充足,避免网络瓶颈。可以通过 iperf 等工具进行网络带宽测试。

七、监控与管理

为了及时了解 RocketMQ 集群的运行状态,需要对其进行监控和管理。

7.1 使用 RocketMQ Console

RocketMQ Console 是一个可视化的管理工具,可以方便地查看集群拓扑、Topic 信息、消息消费情况等。可以从 GitHub 上下载 RocketMQ Console 的源代码并自行编译部署,也可以使用已编译好的二进制包。 下载并解压 RocketMQ Console 二进制包后,编辑 application.properties 文件,配置 NameServer 地址:

rocketmq.config.namesrvAddr=192.168.1.100:9876;192.168.1.101:9876

然后执行以下命令启动 RocketMQ Console:

nohup java -jar rocketmq-console-ng-2.0.0.jar &

启动成功后,通过浏览器访问 http://服务器IP:8080,即可看到 RocketMQ Console 的管理界面。

7.2 自定义监控指标

除了使用 RocketMQ Console,还可以通过自定义监控指标来更深入地了解 RocketMQ 集群的运行情况。RocketMQ 提供了丰富的监控接口,可以通过 JMX 或者 HTTP 接口获取监控数据。 例如,通过 JMX 获取 Broker 的消息堆积量:

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;

public class RocketMQMonitor {
    public static void main(String[] args) throws Exception {
        String jmxUrl = "service:jmx:rmi:///jndi/rmi://192.168.1.100:9999/jmxrmi";
        JMXServiceURL serviceURL = new JMXServiceURL(jmxUrl);
        JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL, new HashMap<>());
        MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();

        ObjectName objectName = new ObjectName("org.apache.rocketmq:type=BrokerController,brokerName=broker-a,class=BrokerController");
        Long queueSize = (Long) mBeanServerConnection.getAttribute(objectName, "messageTotalInQueue");
        System.out.println("Message queue size: " + queueSize);

        jmxConnector.close();
    }
}

通过这种方式,可以获取到诸如消息发送速率、消费速率、堆积量等关键指标,从而对集群进行更细致的监控和调优。

通过以上详细的步骤,我们完成了 RocketMQ 的安装、集群搭建、代码示例展示、常见问题解决、优化调优以及监控管理等内容,希望能帮助你在后端开发中更好地应用 RocketMQ 消息队列。