MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ架构在分布式系统中的应用实践

2022-01-017.2k 阅读

RocketMQ架构概述

RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有低延迟、高并发、高可用以及可靠的消息传递等特性。在分布式系统中,其架构设计精妙,主要由NameServer、Broker、Producer和Consumer等组件构成。

NameServer

NameServer是一个轻量级的元数据服务,在RocketMQ架构中起着至关重要的导航作用。它负责存储Topic和Broker的路由信息。NameServer集群中的各个节点之间相互独立,没有任何信息同步机制。每个NameServer节点都定期从Broker节点获取最新的路由数据并进行存储。

当Producer发送消息或者Consumer消费消息时,首先会向NameServer获取相应Topic的路由信息。Producer根据这些路由信息选择合适的Broker节点发送消息,而Consumer则依据路由信息从对应的Broker拉取消息。这种设计使得NameServer的部署和维护相对简单,同时也保证了系统的高可用性。即使部分NameServer节点出现故障,其他节点依然可以提供路由服务。

Broker

Broker是RocketMQ的核心组件,负责消息的存储、转发以及与Producer和Consumer的交互。Broker分为Master和Slave两种角色,Master负责处理读写请求,而Slave则主要用于数据备份和读请求分担。

Broker采用了基于Topic的消息存储方式,每个Topic可以配置多个Queue,消息会按照一定的规则被存储到不同的Queue中。这种设计不仅提高了消息的存储和读取效率,还为分布式消费提供了基础。在消息存储方面,Broker使用了CommitLog来顺序存储所有的消息,同时通过ConsumeQueue来记录每个Queue的消息索引,从而加快消息的查询速度。

当Producer发送消息到Broker时,Broker会将消息持久化到CommitLog中,并更新相应的ConsumeQueue。而当Consumer从Broker拉取消息时,Broker会根据ConsumeQueue中的索引信息从CommitLog中读取消息并返回给Consumer。

Producer

Producer即消息生产者,负责将业务系统中的消息发送到RocketMQ集群。Producer在发送消息前,需要先从NameServer获取Topic的路由信息,然后根据负载均衡算法选择一个Broker节点发送消息。

RocketMQ提供了多种消息发送方式,包括同步发送、异步发送和单向发送。同步发送是指Producer发送消息后,会等待Broker的响应,只有在收到成功响应后才会继续执行后续操作,这种方式适用于对消息可靠性要求较高的场景。异步发送则是Producer发送消息后,不会等待Broker的响应,而是通过回调函数来处理发送结果,这种方式可以提高发送效率,适用于对响应时间要求较高的场景。单向发送是指Producer只负责发送消息,不关心发送结果,这种方式通常用于对消息可靠性要求不高但对性能要求极高的场景,如日志记录。

Consumer

Consumer即消息消费者,负责从RocketMQ集群中拉取消息并进行业务处理。Consumer在启动时,同样需要从NameServer获取Topic的路由信息,然后根据负载均衡算法分配到具体的Broker节点拉取消息。

RocketMQ支持两种消费模式:集群消费和广播消费。在集群消费模式下,同一个Consumer Group中的多个Consumer实例会平均分摊消费消息,适用于大多数业务场景。而在广播消费模式下,同一个Consumer Group中的每个Consumer实例都会消费到所有的消息,适用于需要将消息广播到所有消费者的场景,如配置更新通知。

RocketMQ在分布式系统中的应用场景

异步解耦

在分布式系统中,不同的服务之间往往存在复杂的调用关系。例如,在一个电商系统中,用户下单后,不仅需要更新订单状态,还可能需要触发库存扣减、积分增加、消息通知等一系列操作。如果这些操作都在下单的同步流程中完成,会导致下单接口的响应时间变长,系统的吞吐量降低。

通过引入RocketMQ,下单操作完成后,只需要将相关的消息发送到消息队列中,而库存扣减、积分增加、消息通知等操作可以由对应的Consumer异步处理。这样不仅提高了下单接口的响应速度,还将各个服务之间的耦合度降低,使得每个服务可以独立地进行扩展和维护。

削峰填谷

在一些具有明显流量高峰和低谷的应用场景中,如电商的促销活动、秒杀等,短时间内会产生大量的请求。如果直接将这些请求发送到后端服务,可能会导致后端服务因过载而崩溃。

RocketMQ可以作为一个缓冲层,在流量高峰时,将大量的请求消息存储在消息队列中,后端服务按照自身的处理能力从队列中逐步拉取消息进行处理。而在流量低谷时,消息队列中的消息也能被及时处理,从而实现削峰填谷的效果,保证后端服务的稳定性。

数据分发与广播

在分布式系统中,有时候需要将某些数据或消息广播到多个服务或节点。例如,配置中心的配置更新需要通知到所有依赖该配置的服务。通过RocketMQ的广播消费模式,可以很方便地实现这一功能。只需要将配置更新消息发送到特定的Topic,所有订阅该Topic的Consumer实例都会收到消息并进行相应的配置更新操作。

RocketMQ架构在分布式系统中的应用实践

环境搭建

  1. 安装Java环境:RocketMQ基于Java开发,首先需要确保系统中安装了JDK 1.8或以上版本。可以通过以下命令检查Java版本:
java -version
  1. 下载RocketMQ:从RocketMQ官方GitHub仓库(https://github.com/apache/rocketmq)下载最新的二进制发行包,解压到指定目录,例如`/opt/rocketmq`。
  2. 启动NameServer:进入RocketMQ的bin目录,执行以下命令启动NameServer:
nohup sh mqnamesrv &

启动成功后,可以在logs/namesrv.log文件中查看启动日志。 4. 启动Broker:在启动Broker之前,需要根据实际情况修改conf/broker.conf配置文件,例如设置Broker的IP地址、端口号、存储路径等。然后执行以下命令启动Broker:

nohup sh mqbroker -c /opt/rocketmq/conf/broker.conf &

同样,可以在logs/broker.log文件中查看Broker的启动日志。

消息发送示例(Java)

  1. 引入依赖:在Maven项目的pom.xml文件中添加RocketMQ客户端依赖:
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>
  1. 同步发送消息:以下是一个简单的同步发送消息的Java代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个Producer实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息对象,指定Topic、Tag和消息内容
            Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
            // 同步发送消息,等待Broker响应
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭Producer
        producer.shutdown();
    }
}
  1. 异步发送消息:异步发送消息的示例代码如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            Message message = new Message("TopicTest", "TagB", ("Hello Async RocketMQ " + i).getBytes("UTF-8"));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%d Send message success: %s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%d Send message failed: %s%n", index, e);
                }
            });
        }

        Thread.sleep(3000);
        producer.shutdown();
    }
}
  1. 单向发送消息:单向发送消息的示例代码如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagC", ("Hello Oneway RocketMQ " + i).getBytes("UTF-8"));
            producer.sendOneway(message);
        }

        producer.shutdown();
    }
}

消息消费示例(Java)

  1. 集群消费:以下是集群消费模式的Java代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ClusterConsumer {
    public static void main(String[] args) throws Exception {
        // 创建一个Consumer实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group4");
        // 设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅Topic和Tag
        consumer.subscribe("TopicTest", "TagA || TagB || TagC");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动Consumer
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
  1. 广播消费:广播消费模式的示例代码如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
        consumer.subscribe("TopicTest", "TagA || TagB || TagC");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Broadcast Consumer Started.");
    }
}

RocketMQ的高级特性与优化

消息顺序性

RocketMQ可以保证消息在单个Queue中的顺序性。在某些业务场景中,如订单处理流程,订单创建、支付、发货等操作需要按照顺序执行,这就需要保证消息的顺序性。

要实现消息顺序性,Producer在发送消息时,需要根据业务主键(如订单ID)选择固定的Queue进行发送。而Consumer在消费消息时,需要使用MessageListenerOrderly监听器,该监听器会保证同一Queue中的消息按顺序消费。

以下是一个简单的顺序消息发送和消费示例:

  1. 顺序消息发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group6");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        String[] tags = {"Tag1", "Tag2", "Tag3", "Tag4", "Tag5"};
        for (int i = 0; i < 10; i++) {
            int orderId = i % 10;
            Message message = new Message("OrderTopic", tags[i % tags.length], ("Hello Ordered RocketMQ " + i).getBytes("UTF-8"));
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}
  1. 顺序消息消费
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group7");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("OrderTopic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Ordered Consumer Started.");
    }
}

消息可靠性

RocketMQ通过多种机制保证消息的可靠性。首先,在消息发送方面,Producer可以通过设置sendMsgTimeoutretryTimesWhenSendFailed等参数来控制消息发送的重试次数和超时时间,确保消息能够成功发送到Broker。

在Broker端,消息会被持久化到磁盘,并且Broker采用了Master - Slave架构,Slave节点会定期从Master节点同步数据,以防止数据丢失。即使Master节点出现故障,Slave节点可以切换为Master继续提供服务。

在消息消费方面,Consumer在成功消费消息后才会向Broker发送确认消息,否则Broker会认为消息消费失败并进行重试。同时,RocketMQ还提供了死信队列(Dead Letter Queue)机制,对于多次消费失败的消息,会被发送到死信队列,方便后续排查和处理。

性能优化

  1. 批量发送消息:Producer可以将多条消息合并成一个批量消息进行发送,减少网络传输次数,提高发送效率。示例代码如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group8");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("BatchTopic", ("Hello Batch RocketMQ " + i).getBytes("UTF-8"));
            messages.add(message);
        }

        producer.send(messages);
        producer.shutdown();
    }
}
  1. 异步消费:Consumer可以采用异步消费的方式,提高消费效率。在异步消费时,Consumer在接收到消息后,将消息处理任务提交到线程池进行处理,主线程继续接收新的消息。示例代码如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncConsumer {
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group9");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("AsyncTopic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                        }
                    });
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Async Consumer Started.");
    }
}

RocketMQ与其他消息队列的比较

与Kafka的比较

  1. 架构设计:Kafka采用了Zookeeper来管理集群元数据,而RocketMQ使用NameServer,NameServer相对Zookeeper来说更加轻量级,部署和维护成本更低。在Kafka中,Broker之间通过副本机制保证数据的可靠性和高可用性,而RocketMQ采用Master - Slave架构,Master负责读写,Slave用于备份和分担读请求。
  2. 消息顺序性:Kafka只能保证分区内的消息顺序性,而RocketMQ可以通过合理的配置,在单个Queue内保证消息的严格顺序性,并且在一些复杂场景下,如分布式事务消息,RocketMQ对顺序性的支持更加灵活。
  3. 消息可靠性:Kafka通过多副本机制保证消息的可靠性,而RocketMQ除了Master - Slave架构的备份机制外,还提供了多种消息发送和消费的确认机制,以及死信队列等功能,在消息可靠性方面表现更为全面。
  4. 应用场景:Kafka适用于大数据领域的日志收集、实时数据处理等场景,其高吞吐量和低延迟特性适合处理海量数据。而RocketMQ在分布式系统的异步解耦、削峰填谷、顺序消息处理等场景中表现出色,更侧重于传统的企业级应用开发。

与RabbitMQ的比较

  1. 协议支持:RabbitMQ支持多种消息协议,如AMQP、STOMP等,而RocketMQ主要使用自定义协议,在与其他系统的集成方面,RabbitMQ具有更大的优势。
  2. 性能:RocketMQ在高并发场景下的性能表现优于RabbitMQ,特别是在消息吞吐量和低延迟方面。RocketMQ采用了基于文件映射的存储方式和高效的网络通信框架,能够处理大量的消息。
  3. 可靠性:RabbitMQ通过持久化、镜像队列等机制保证消息的可靠性,RocketMQ同样具备强大的可靠性保证机制,并且在分布式事务消息处理方面,RocketMQ提供了更为完善的解决方案。
  4. 应用场景:RabbitMQ适用于对消息协议兼容性要求较高、对可靠性有一定要求但并发量不是特别高的场景,如企业内部的业务流程集成。而RocketMQ更适合大型分布式系统,尤其是对性能和可靠性要求极高的场景。

通过以上对RocketMQ架构在分布式系统中的应用实践的详细介绍,包括架构概述、应用场景、代码示例以及与其他消息队列的比较,可以看出RocketMQ在分布式系统中具有强大的功能和广泛的应用前景。无论是对于高并发的互联网应用,还是对可靠性要求极高的企业级应用,RocketMQ都能提供可靠的消息传递解决方案。在实际应用中,需要根据具体的业务需求和系统架构,合理地选择和使用RocketMQ,以充分发挥其优势。