MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 基于 Docker 的部署与实践

2021-06-213.1k 阅读

一、RocketMQ 简介

RocketMQ 是一款由阿里巴巴开源的分布式消息中间件,具有高吞吐量、高可用性、低延迟等特点。它在阿里巴巴内部经过多年的实践检验,广泛应用于电商、金融等众多领域。RocketMQ 基于发布 - 订阅模型,能够有效地解耦应用程序之间的依赖关系,实现异步通信和流量削峰等功能。

二、Docker 基础概述

Docker 是一种开源的应用容器引擎,它允许开发者将应用程序及其依赖打包到一个可移植的容器中,然后发布到任何支持 Docker 的服务器上运行。Docker 容器具有轻量级、可移植、隔离性强等优点,极大地简化了应用程序的部署和管理过程。通过使用 Docker,我们可以快速搭建和部署 RocketMQ 环境,提高开发和运维效率。

三、基于 Docker 部署 RocketMQ

(一)环境准备

  1. 安装 Docker:首先确保你的服务器安装了 Docker。以 Ubuntu 系统为例,可以通过以下命令安装:
sudo apt-get update
sudo apt-get install docker.io
  1. 安装 Docker Compose:Docker Compose 用于定义和运行多个 Docker 容器的应用程序。安装命令如下:
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker - compose - $(uname - s)-$(uname - m)" -o /usr/local/bin/docker - compose
sudo chmod +x /usr/local/bin/docker - compose

(二)创建 RocketMQ 配置文件

  1. 创建目录:在服务器上创建一个目录用于存放 RocketMQ 的配置文件,例如 /root/rocketmq - config
mkdir /root/rocketmq - config
  1. broker 配置文件:在 rocketmq - config 目录下创建 broker.conf 文件,内容如下:
brokerClusterName = DefaultCluster
brokerName = broker - a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
  1. namesrv 配置文件:可以保持默认配置,不需要额外创建配置文件。

(三)编写 Docker Compose 文件

rocketmq - config 目录下创建 docker - compose.yml 文件,内容如下:

version: '3'
services:
  namesrv:
    image: apache/rocketmq - namesrv:4.9.3
    container_name: rocketmq - namesrv
    ports:
      - 9876:9876
    volumes:
      - /root/rocketmq - store/namesrv/logs:/home/rocketmq/logs
      - /root/rocketmq - store/namesrv/store:/home/rocketmq/store
    command: sh mqnamesrv
  broker:
    image: apache/rocketmq - broker:4.9.3
    container_name: rocketmq - broker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - /root/rocketmq - store/broker/logs:/home/rocketmq/logs
      - /root/rocketmq - store/broker/store:/home/rocketmq/store
      - /root/rocketmq - config/broker.conf:/home/rocketmq/conf/broker.conf
    environment:
      - NAMESRV_ADDR=rocketmq - namesrv:9876
      - JAVA_OPTS=-Duser.home=/home/rocketmq -Xms1g -Xmx1g -Drocketmq.namesrv.addr=rocketmq - namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    command: sh mqbroker -c /home/rocketmq/conf/broker.conf
  rocketmq - console:
    image: styletang/rocketmq - console - ng:1.0.0
    container_name: rocketmq - console
    ports:
      - 8080:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rocketmq - namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false

上述配置文件定义了三个服务:

  • namesrv:RocketMQ 的名称服务器,负责注册和发现 broker 节点。
  • broker:RocketMQ 的消息代理服务器,负责存储和转发消息。
  • rocketmq - console:RocketMQ 的控制台,用于管理和监控 RocketMQ 集群。

(四)启动 RocketMQ 服务

rocketmq - config 目录下执行以下命令启动 RocketMQ 服务:

docker - compose up -d

启动后,可以通过 docker ps 命令查看容器状态,确保所有容器都正常运行。

四、RocketMQ 实践操作

(一)生产者代码示例(Java)

  1. 引入依赖:在 Maven 项目的 pom.xml 文件中添加 RocketMQ 客户端依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq - client</artifactId>
    <version>4.9.3</version>
</dependency>
  1. 生产者代码
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例
            Message message = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

上述代码创建了一个 RocketMQ 生产者,向名为 TopicTest 的主题发送 10 条消息。

(二)消费者代码示例(Java)

  1. 同样引入依赖:与生产者相同,在 pom.xml 文件中添加 RocketMQ 客户端依赖。
  2. 消费者代码
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅主题
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

上述代码创建了一个 RocketMQ 消费者,订阅 TopicTest 主题,并在接收到消息时打印消息内容。

五、RocketMQ 高级特性实践

(一)事务消息

  1. 事务消息概念:事务消息是 RocketMQ 提供的一种特殊消息类型,用于保证分布式事务的最终一致性。它通过两阶段提交的方式,将消息的发送和本地事务的执行绑定在一起。
  2. 事务生产者代码示例
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                System.out.println("执行本地事务:" + new String(msg.getBody()));
                // 模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 检查本地事务状态
                System.out.println("检查本地事务:" + new String(msg.getBody()));
                // 模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        Message message = new Message("TransactionTopic", "事务消息内容".getBytes());
        SendResult sendResult = producer.sendMessageInTransaction(message, null);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

上述代码创建了一个事务消息生产者,在发送消息时会执行本地事务,并在必要时进行事务状态检查。

(二)顺序消息

  1. 顺序消息概念:顺序消息是指消息的消费顺序与发送顺序一致。RocketMQ 通过分区(Queue)来保证消息在同一个分区内的顺序性。
  2. 顺序生产者代码示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        String[] tags = {"TagA", "TagB", "TagC", "TagD"};
        for (int i = 0; i < 10; i++) {
            int orderId = i % 4;
            Message message = new Message("OrderedTopic", tags[orderId], ("顺序消息 " + i).getBytes());
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}

上述代码创建了一个顺序消息生产者,通过 MessageQueueSelector 保证相同 orderId 的消息发送到同一个队列,从而实现顺序消费。

  1. 顺序消费者代码示例
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("OrderedTopic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到顺序消息:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("顺序消费者已启动");
    }
}

上述代码创建了一个顺序消息消费者,确保消息按照发送顺序进行消费。

六、RocketMQ 监控与运维

(一)使用 RocketMQ Console 监控

  1. 访问 RocketMQ Console:在浏览器中输入 http://服务器 IP:8080,即可访问 RocketMQ Console。在控制台中,可以查看集群状态、主题信息、消费者组信息等。
  2. 查看主题详情:在控制台的主题列表中,可以查看每个主题的队列数量、消息堆积情况等信息。通过这些信息,可以及时发现和解决消息积压等问题。
  3. 查看消费者组详情:在消费者组列表中,可以查看消费者组的消费进度、消费状态等信息。如果发现某个消费者组消费缓慢,可以进一步排查原因。

(二)日志分析

  1. Namesrv 日志:Namesrv 的日志位于 /root/rocketmq - store/namesrv/logs 目录下。通过查看 namesrv.log 文件,可以了解 NameServer 的启动过程、注册信息等。如果 NameServer 出现异常,可以从日志中查找线索。
  2. Broker 日志:Broker 的日志位于 /root/rocketmq - store/broker/logs 目录下。broker.log 文件记录了 Broker 的运行状态、消息存储和转发等信息。当 Broker 出现消息丢失、存储异常等问题时,需要分析该日志。
  3. Client 日志:生产者和消费者的日志可以通过在代码中配置日志框架来记录。例如,使用 Log4j 或 Logback 配置日志输出,以便在出现问题时能够快速定位。

(三)集群扩展与优化

  1. 增加 Broker 节点:如果集群的消息处理能力不足,可以通过增加 Broker 节点来扩展集群。在 docker - compose.yml 文件中添加新的 Broker 服务,并配置相应的 broker.conf 文件。然后使用 docker - compose up -d 命令启动新的 Broker 节点。
  2. 优化配置参数:根据实际业务场景,调整 RocketMQ 的配置参数,如内存参数(JAVA_OPTS)、存储参数(fileReservedTime 等)。合理的配置参数可以提高 RocketMQ 的性能和稳定性。

七、常见问题及解决方法

(一)消息发送失败

  1. 原因分析
  • NameServer 地址配置错误,导致生产者无法连接到 NameServer。
  • 主题不存在,需要先在 RocketMQ 中创建主题。
  • 网络问题,如防火墙阻止了生产者与 Broker 之间的通信。
  1. 解决方法
  • 检查 NameServer 地址是否正确,确保生产者能够正常连接到 NameServer。
  • 使用 RocketMQ Console 或命令行工具创建主题。
  • 检查防火墙设置,开放生产者与 Broker 之间的通信端口。

(二)消息消费失败

  1. 原因分析
  • 消费者代码中出现异常,导致消息消费失败。
  • 消费者组配置错误,可能与其他消费者组冲突。
  • 消息积压,导致消费者处理不过来。
  1. 解决方法
  • 检查消费者代码,捕获并处理异常,确保消息能够正常消费。
  • 检查消费者组名称,确保其唯一性。
  • 分析消息积压原因,如增加消费者实例数量、优化消费者处理逻辑等。

(三)RocketMQ 集群不稳定

  1. 原因分析
  • 硬件资源不足,如内存、CPU 等。
  • 配置参数不合理,如 Broker 的存储参数、内存参数等。
  • 网络抖动,导致节点之间通信异常。
  1. 解决方法
  • 监控服务器的硬件资源使用情况,及时扩容。
  • 根据实际业务场景,调整 RocketMQ 的配置参数。
  • 检查网络环境,确保网络稳定。

通过以上基于 Docker 的 RocketMQ 部署与实践,你可以快速搭建和使用 RocketMQ 消息队列,并在实际项目中充分发挥其优势,实现高效的异步通信和系统解耦。同时,通过掌握常见问题的解决方法,可以保证 RocketMQ 集群的稳定运行。