MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

解析RocketMQ架构的集群搭建与配置

2023-12-097.3k 阅读

RocketMQ架构概述

RocketMQ是一款分布式消息中间件,由阿里巴巴开源,后捐献给Apache并成为顶级项目。它具有低延迟、高并发、高可用、海量消息堆积能力等特点,在电商、金融等众多领域广泛应用。

核心组件

  1. NameServer:NameServer是一个轻量级的元数据服务,主要负责保存Topic和Broker的路由信息。NameServer之间相互独立,不进行数据同步,Broker会定时向每个NameServer上报自己的状态信息。当Producer发送消息或Consumer消费消息时,首先会从NameServer获取Topic的路由信息,从而知道该向哪个Broker发送或从哪个Broker拉取消息。
  2. Broker:Broker是RocketMQ的核心组件,负责存储和转发消息。它接收Producer发送的消息,存储到本地磁盘,并提供Consumer拉取消息的服务。Broker分为Master和Slave两种角色,Master负责写消息和读消息,Slave从Master同步数据,用于在Master故障时提供容灾备份。
  3. Producer:Producer即消息生产者,负责产生并发送消息到Broker。Producer可以根据不同的策略,如轮询、随机等,将消息发送到不同的Broker。同时,Producer支持事务消息,以保证消息发送的最终一致性。
  4. Consumer:Consumer即消息消费者,负责从Broker拉取消息并进行处理。Consumer分为PushConsumer和PullConsumer两种类型,PushConsumer是由Broker主动推送消息给Consumer,而PullConsumer则是Consumer主动从Broker拉取消息。

消息存储结构

RocketMQ的消息存储采用基于文件系统的存储方式,主要由CommitLog、ConsumeQueue和IndexFile组成。

  1. CommitLog:CommitLog是消息的物理存储文件,所有的消息都顺序写入到CommitLog中。这种顺序写的方式极大地提高了写入性能。每个CommitLog文件大小固定为1G,当一个文件写满后,会创建一个新的文件。
  2. ConsumeQueue:ConsumeQueue是消息的逻辑队列,它存储了指向CommitLog中消息的物理偏移量等信息。每个Topic的每个Queue都有一个对应的ConsumeQueue,Consumer通过ConsumeQueue快速定位到CommitLog中的消息。
  3. IndexFile:IndexFile用于为消息建立索引,方便根据消息的Key或时间戳等条件快速查询消息。IndexFile通过哈希表结构存储消息的索引信息,提高了消息查询的效率。

集群搭建

环境准备

  1. 操作系统:推荐使用Linux系统,如CentOS 7或以上版本。本文以CentOS 7为例进行讲解。
  2. Java环境:RocketMQ基于Java开发,需要安装JDK 1.8或以上版本。可以通过以下命令检查Java是否安装:
java -version

如果未安装,可以从Oracle官网下载并安装JDK。安装完成后,配置Java环境变量:

export JAVA_HOME=/usr/local/jdk1.8.0_291
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
  1. 下载RocketMQ:从RocketMQ官方GitHub仓库(https://github.com/apache/rocketmq/releases)下载最新版本的RocketMQ二进制包。本文以RocketMQ 4.9.4版本为例,下载后解压:
tar -zxvf rocketmq-all-4.9.4-bin-release.tar.gz
cd rocketmq-all-4.9.4-bin-release

单Master模式搭建

  1. 启动NameServer:在RocketMQ解压目录下,执行以下命令启动NameServer:
nohup sh bin/mqnamesrv &

启动后,可以通过以下命令查看NameServer日志,确认是否启动成功:

tail -f logs/namesrv.log

如果看到类似“The Name Server boot success”的日志,表示NameServer启动成功。 2. 启动Broker:在RocketMQ解压目录下,编辑conf/broker.conf文件,添加以下配置:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

然后执行以下命令启动Broker:

nohup sh bin/mqbroker -c conf/broker.conf &

同样,通过查看Broker日志确认是否启动成功:

tail -f logs/broker.log

如果看到类似“The broker[broker-a, 172.30.30.233:10911] boot success”的日志,表示Broker启动成功。

多Master模式搭建

多Master模式是指集群中有多个Master节点,每个Master节点之间相互独立,没有Slave节点。这种模式的优点是性能高、无单点故障,但在Master节点故障时,可能会丢失部分未同步的消息。

  1. 配置NameServer:与单Master模式相同,启动多个NameServer实例,每个实例监听不同的端口。例如,启动两个NameServer实例,分别监听9876和9877端口:
nohup sh bin/mqnamesrv -p 9876 &
nohup sh bin/mqnamesrv -p 9877 &
  1. 配置Broker:假设搭建两个Master节点,分别为broker-a和broker-b。在RocketMQ解压目录下,复制conf/broker.conf文件,分别命名为conf/broker-a.confconf/broker-b.conf。 编辑conf/broker-a.conf文件,添加以下配置:
brokerClusterName = Cluster-2M
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877

编辑conf/broker-b.conf文件,添加以下配置:

brokerClusterName = Cluster-2M
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877

其中,namesrvAddr配置为两个NameServer的地址。 3. 启动Broker:分别启动两个Broker:

nohup sh bin/mqbroker -c conf/broker-a.conf &
nohup sh bin/mqbroker -c conf/broker-b.conf &

通过查看日志确认两个Broker是否启动成功。

多Master多Slave模式搭建

多Master多Slave模式是在多Master模式的基础上,为每个Master节点配置一个或多个Slave节点。Slave节点从Master节点同步数据,当Master节点故障时,Slave节点可以切换为Master节点继续提供服务,提高了系统的可用性。

  1. 配置NameServer:与多Master模式相同,启动多个NameServer实例。
  2. 配置Broker:假设搭建两个Master节点(broker-a和broker-b)和两个Slave节点(broker-a-s和broker-b-s)。 编辑conf/broker-a.conf文件,添加以下配置:
brokerClusterName = Cluster-2M-2S
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877

编辑conf/broker-a-s.conf文件,添加以下配置:

brokerClusterName = Cluster-2M-2S
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877

编辑conf/broker-b.conf文件,添加以下配置:

brokerClusterName = Cluster-2M-2S
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877

编辑conf/broker-b-s.conf文件,添加以下配置:

brokerClusterName = Cluster-2M-2S
brokerName = broker-b
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
namesrvAddr = 192.168.1.100:9876;192.168.1.100:9877
  1. 启动Broker:按照顺序启动Master和Slave节点:
nohup sh bin/mqbroker -c conf/broker-a.conf &
nohup sh bin/mqbroker -c conf/broker-a-s.conf &
nohup sh bin/mqbroker -c conf/broker-b.conf &
nohup sh bin/mqbroker -c conf/broker-b-s.conf &

通过查看日志确认所有Broker是否启动成功。

配置优化

Broker配置优化

  1. 内存配置:Broker默认的堆内存配置可能无法满足高并发场景的需求,可以通过修改runbroker.sh文件来调整堆内存大小。例如,将-Xms和-Xmx设置为4G:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  1. 存储配置:根据实际需求调整CommitLog和ConsumeQueue的存储路径,避免存储在系统盘导致空间不足。可以在broker.conf文件中通过storePathCommitLogstorePathConsumeQueue参数进行配置。
  2. 刷盘策略:RocketMQ支持同步刷盘和异步刷盘两种策略。同步刷盘可以保证消息的可靠性,但会降低写入性能;异步刷盘写入性能高,但可能会丢失少量未刷盘的消息。可以根据业务需求在broker.conf文件中通过flushDiskType参数进行配置,如ASYNC_FLUSH表示异步刷盘,SYNC_FLUSH表示同步刷盘。

NameServer配置优化

  1. 内存配置:与Broker类似,可以通过修改runserver.sh文件来调整NameServer的堆内存大小。
  2. 集群配置:在多NameServer集群中,可以合理分配NameServer的负载,避免某个NameServer负载过高。可以通过配置多个NameServer地址,让Producer和Consumer随机选择NameServer进行连接。

代码示例

Producer代码示例(Java)

以下是一个简单的Java Producer示例,用于发送普通消息:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建Producer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer地址
        producer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
        // 启动Producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息
            Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println("SendResult: " + sendResult);
        }

        // 关闭Producer
        producer.shutdown();
    }
}

Consumer代码示例(Java)

以下是一个简单的Java Consumer示例,用于消费消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建Consumer实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 设置NameServer地址
        consumer.setNamesrvAddr("192.168.1.100:9876;192.168.1.100:9877");
        // 订阅Topic
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动Consumer
        consumer.start();
        System.out.println("Consumer started.");
    }
}

总结

通过以上步骤,我们详细介绍了RocketMQ架构的集群搭建与配置,并提供了相关的代码示例。在实际应用中,需要根据业务需求和系统规模,合理选择集群模式和进行配置优化,以充分发挥RocketMQ的高性能、高可用等特性。同时,要注意监控和维护集群的运行状态,及时处理可能出现的故障和问题。