MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Spring Cloud RocketMQ 消息驱动应用案例

2022-02-254.7k 阅读

Spring Cloud RocketMQ 消息驱动应用案例

1. Spring Cloud RocketMQ 基础

在微服务架构日益流行的今天,消息队列扮演着至关重要的角色。RocketMQ 作为一款高性能、高可靠的分布式消息队列,被广泛应用于各类企业级项目中。Spring Cloud RocketMQ 则是基于 Spring Cloud 体系对 RocketMQ 的封装与集成,使得在 Spring Cloud 项目中使用 RocketMQ 变得更加便捷。

1.1 RocketMQ 简介

RocketMQ 是阿里巴巴开源的分布式消息中间件,经历了多次双十一的严苛考验,具有低延迟、高并发、高可用以及可靠的消息传递等特性。它采用了生产者(Producer)、消费者(Consumer)、主题(Topic)、队列(Queue)等核心概念。生产者负责发送消息到指定的主题,而消费者则从主题的队列中拉取消息进行处理。

1.2 Spring Cloud RocketMQ 集成优势

通过 Spring Cloud RocketMQ 的集成,开发者可以充分利用 Spring Cloud 的生态优势,如自动配置、依赖管理等。同时,借助 RocketMQ 的强大功能,实现微服务之间的异步通信、解耦业务模块,提高系统的整体性能和可扩展性。例如,在一个电商系统中,订单服务生成订单后,可以通过 RocketMQ 发送消息给库存服务、物流服务等,这些服务异步处理消息,避免了服务之间的强耦合。

2. 环境搭建

在开始构建 Spring Cloud RocketMQ 消息驱动应用案例之前,需要搭建好相应的开发环境。

2.1 安装 RocketMQ

首先,从 RocketMQ 官方网站下载 RocketMQ 的二进制包。解压下载的文件后,进入 bin 目录。启动 NameServer,在命令行中执行 mqnamesrv 命令(在 Linux 或 macOS 系统下)。NameServer 是 RocketMQ 的路由中心,负责管理 Topic 与 Broker 的映射关系。

然后,启动 Broker,执行 mqbroker -n localhost:9876 命令,其中 -n 参数指定 NameServer 的地址和端口。

2.2 创建 Spring Boot 项目

利用 Spring Initializr(https://start.spring.io/)创建一个新的 Spring Boot 项目。在依赖选择中,添加 Spring Cloud Alibaba RocketMQ Starter 依赖。如果项目使用 Maven 构建,在 pom.xml 文件中会自动添加如下依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-rocketmq</artifactId>
</dependency>

如果使用 Gradle 构建项目,则在 build.gradle 文件中添加:

implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-rocketmq'

2.3 配置 RocketMQ

application.propertiesapplication.yml 文件中配置 RocketMQ 的相关参数。例如,使用 application.yml 进行配置:

spring:
  cloud:
    rocketmq:
      name-server: localhost:9876
      producer:
        group: my-producer-group
      consumer:
        group: my-consumer-group
        topics: my-topic

上述配置中,name-server 指定了 RocketMQ NameServer 的地址,producer.group 定义了生产者组,consumer.group 定义了消费者组,consumer.topics 定义了消费者要订阅的主题。

3. 消息生产者实现

接下来,创建一个消息生产者来发送消息到 RocketMQ。

3.1 创建生产者服务类

在项目的 src/main/java 目录下,创建一个 ProducerService 类:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
}

在上述代码中,通过 @Autowired 注入了 RocketMQTemplate,这是 Spring Cloud RocketMQ 提供的用于发送消息的核心类。convertAndSend 方法将消息发送到指定的主题。

3.2 测试生产者

创建一个测试类 ProducerServiceTest 来验证生产者是否能正常工作:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class ProducerServiceTest {

    @Autowired
    private ProducerService producerService;

    @Test
    public void testSendMessage() {
        String topic = "my-topic";
        String message = "Hello, RocketMQ!";
        producerService.sendMessage(topic, message);
    }
}

运行上述测试方法,如果没有报错,消息将被发送到 RocketMQ 的 my-topic 主题中。

4. 消息消费者实现

消息发送成功后,需要创建一个消息消费者来接收并处理消息。

4.1 创建消费者类

src/main/java 目录下,创建一个 ConsumerListener 类:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class ConsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
        // 处理消息的业务逻辑
        // 例如,更新数据库、调用其他服务等
    }
}

在上述代码中,通过 @RocketMQMessageListener 注解指定了消费者要监听的主题和消费者组。实现 RocketMQListener 接口的 onMessage 方法,在该方法中处理接收到的消息。这里简单地打印了接收到的消息,实际应用中可以根据业务需求进行更复杂的处理。

4.2 消息处理的可靠性保证

在实际应用中,确保消息处理的可靠性至关重要。RocketMQ 提供了多种机制来保证消息的可靠消费。例如,消费者在处理完消息后,可以手动确认消息,防止消息丢失。在 ConsumerListener 类中,可以通过如下方式实现手动确认:

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class ConsumerListener implements RocketMQListener<String, ConsumeConcurrentlyContext> {

    @Override
    public void onMessage(String message, ConsumeConcurrentlyContext context) {
        try {
            System.out.println("Received message: " + message);
            // 处理消息的业务逻辑
            // 例如,更新数据库、调用其他服务等
            // 模拟业务处理成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 处理失败,返回重试状态
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

上述代码中,ConsumeConcurrentlyContext 提供了确认消息处理状态的方法。如果消息处理成功,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;如果处理失败,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ 将在一定时间后重新投递该消息。

5. 高级特性应用

除了基本的消息发送和接收,Spring Cloud RocketMQ 还支持许多高级特性。

5.1 事务消息

在一些业务场景中,需要保证消息发送与本地事务的一致性。例如,在电商系统中,创建订单成功后,需要发送消息通知库存服务扣减库存,这两个操作需要保证原子性。RocketMQ 提供了事务消息来满足这种需求。

首先,创建一个事务生产者服务类 TransactionProducerService

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQTransactionCallback;
import org.apache.rocketmq.spring.support.RocketMQTransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class TransactionProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendTransactionMessage(String topic, String message) {
        Message<String> msg = MessageBuilder.withPayload(message).build();
        RocketMQTransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, msg, null);
        System.out.println("Transaction send result: " + sendResult);
    }
}

然后,创建一个事务监听器 TransactionListenerImpl 来处理事务状态:

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQTransactionContext;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务,例如更新数据库
            // 如果本地事务执行成功,返回 COMMIT_MESSAGE
            return RocketMQLocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 如果本地事务执行失败,返回 ROLLBACK_MESSAGE
            return RocketMQLocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(RocketMQTransactionContext context) {
        // 检查本地事务状态,例如查询数据库
        // 如果本地事务已提交,返回 COMMIT_MESSAGE
        // 如果本地事务未提交,返回 ROLLBACK_MESSAGE 或 UNKNOWN
        return RocketMQLocalTransactionState.COMMIT_MESSAGE;
    }
}

在上述代码中,executeLocalTransaction 方法在发送半消息成功后执行本地事务,checkLocalTransaction 方法用于 RocketMQ 回查本地事务状态。

5.2 顺序消息

在某些业务场景下,消息的顺序性至关重要。例如,在订单处理中,订单创建、支付、发货等消息需要按顺序处理。RocketMQ 支持顺序消息的发送和消费。

对于顺序消息的发送,生产者需要将消息发送到同一个队列。修改 ProducerService 类来发送顺序消息:

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendOrderlyMessage(String topic, String message, int queueId) {
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, message, queueId);
        System.out.println("Orderly send result: " + sendResult);
    }
}

对于顺序消息的消费,消费者需要保证同一个队列的消息按顺序处理。修改 ConsumerListener 类来处理顺序消息:

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class ConsumerListener implements RocketMQListener<String, ConsumeOrderlyContext> {

    @Override
    public void onMessage(String message, ConsumeOrderlyContext context) {
        System.out.println("Received orderly message: " + message);
        // 处理顺序消息的业务逻辑
        // 处理完成后返回 CONSUME_SUCCESS
        return ConsumeOrderlyStatus.CONSUME_SUCCESS;
    }
}

在上述代码中,syncSendOrderly 方法将消息发送到指定队列,消费者通过实现 RocketMQListener<String, ConsumeOrderlyContext> 接口来按顺序处理消息。

6. 集群与高可用配置

在生产环境中,为了保证系统的高可用性和高性能,需要对 RocketMQ 进行集群配置。

6.1 多 NameServer 配置

NameServer 是 RocketMQ 的路由中心,多个 NameServer 可以组成集群以提高可用性。修改 application.yml 文件,配置多个 NameServer:

spring:
  cloud:
    rocketmq:
      name-server: localhost:9876;localhost:9877
      producer:
        group: my-producer-group
      consumer:
        group: my-consumer-group
        topics: my-topic

在上述配置中,通过分号分隔多个 NameServer 的地址和端口。

6.2 多 Broker 配置

Broker 是 RocketMQ 存储和转发消息的核心组件,多个 Broker 可以组成集群。在 conf 目录下,创建多个 broker.conf 文件,分别配置不同的 Broker。例如,broker-a.confbroker-b.conf

# broker-a.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr = localhost:9876;localhost:9877
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# broker-b.conf
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 1
namesrvAddr = localhost:9876;localhost:9877
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH

在上述配置中,brokerClusterName 定义了集群名称,brokerName 定义了 Broker 名称,brokerId 标识 Broker 的唯一 ID,namesrvAddr 指定了 NameServer 的地址,brokerRole 定义了 Broker 的角色(主或从),flushDiskType 定义了刷盘策略。

然后,分别启动两个 Broker:

mqbroker -n localhost:9876;localhost:9877 -c conf/broker-a.conf
mqbroker -n localhost:9876;localhost:9877 -c conf/broker-b.conf

6.3 生产者与消费者的高可用

在集群环境下,生产者和消费者会自动发现 NameServer 和 Broker 的变化。生产者在发送消息时,会根据 NameServer 的路由信息将消息发送到合适的 Broker。消费者在订阅主题时,也会从多个 Broker 中拉取消息。这种机制保证了在部分 NameServer 或 Broker 出现故障时,系统仍能正常工作。

例如,生产者在发送消息时,RocketMQTemplate 会根据配置的多个 NameServer 地址,自动选择可用的 NameServer 获取路由信息,并将消息发送到对应的 Broker。消费者在启动时,会向多个 NameServer 注册,并从多个 Broker 的队列中拉取消息进行处理。

7. 性能优化与监控

为了保证系统的高性能和稳定性,需要对 Spring Cloud RocketMQ 应用进行性能优化和监控。

7.1 性能优化

  • 批量发送消息:生产者可以采用批量发送消息的方式,减少网络开销。修改 ProducerService 类,实现批量发送消息:
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class ProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendBatchMessage(String topic, List<String> messages) {
        List<org.springframework.messaging.Message<String>> batchMessages = new ArrayList<>();
        for (String message : messages) {
            batchMessages.add(org.springframework.messaging.support.MessageBuilder.withPayload(message).build());
        }
        SendResult sendResult = rocketMQTemplate.syncSend(topic, batchMessages);
        System.out.println("Batch send result: " + sendResult);
    }
}
  • 合理设置参数:在 application.yml 文件中,合理设置生产者和消费者的参数,如 sendMsgTimeout(发送消息超时时间)、maxMessageSize(最大消息大小)、consumeThreadMinconsumeThreadMax(消费者线程数)等。
spring:
  cloud:
    rocketmq:
      name-server: localhost:9876
      producer:
        group: my-producer-group
        send-msg-timeout: 3000
        max-message-size: 4194304
      consumer:
        group: my-consumer-group
        topics: my-topic
        consume-thread-min: 10
        consume-thread-max: 20

7.2 监控

RocketMQ 提供了多种监控方式,如 RocketMQ Console 和 Prometheus + Grafana。

  • RocketMQ Console:从 GitHub 上下载 RocketMQ Console 的源码,编译并启动。在浏览器中访问 RocketMQ Console 的地址(如 http://localhost:8080),可以查看 Broker、Topic、Consumer 等的运行状态,包括消息发送和消费的速率、堆积量等信息。

  • Prometheus + Grafana:通过 Prometheus 采集 RocketMQ 的指标数据,如消息发送量、消费量、延迟等。然后,使用 Grafana 进行数据可视化,创建各种监控图表,实时监控 RocketMQ 的运行状态。

在 Spring Cloud RocketMQ 应用中,可以通过添加相应的依赖和配置,将应用的指标数据暴露给 Prometheus。例如,在 pom.xml 文件中添加 micrometer-registry-prometheus 依赖:

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

然后,在 application.yml 文件中配置 Prometheus 相关参数:

management:
  metrics:
    tags:
      application: my-application
  endpoints:
    web:
      exposure:
        include: prometheus
  endpoint:
    prometheus:
      enabled: true

通过上述配置,Spring Cloud RocketMQ 应用的指标数据将暴露在 /actuator/prometheus 接口,供 Prometheus 采集。

通过以上性能优化和监控措施,可以确保 Spring Cloud RocketMQ 应用在生产环境中高效、稳定地运行。在实际应用中,还需要根据业务需求和系统规模,不断调整和优化配置,以达到最佳的性能和可用性。同时,持续关注 RocketMQ 的官方文档和社区动态,及时了解新特性和优化建议,也是提升系统性能的重要途径。

以上就是一个完整的 Spring Cloud RocketMQ 消息驱动应用案例,涵盖了从基础概念到实际应用的各个方面,希望对读者在微服务架构中使用 RocketMQ 有所帮助。在实际项目中,可以根据具体业务场景,灵活运用这些知识,构建高性能、高可靠的分布式系统。例如,在大型电商系统中,利用 RocketMQ 的事务消息和顺序消息特性,确保订单处理的准确性和一致性;在金融系统中,通过合理配置集群和监控,保障消息的可靠传递和系统的高可用性。同时,随着业务的发展和数据量的增长,不断优化和扩展系统,以满足日益复杂的业务需求。在性能优化方面,除了批量发送消息和合理设置参数外,还可以考虑异步发送消息、优化消息格式等方式,进一步提升系统的吞吐量和响应速度。在监控方面,除了 RocketMQ Console 和 Prometheus + Grafana 外,还可以结合其他监控工具,如 SkyWalking 等,实现全链路的性能监控和故障排查。总之,Spring Cloud RocketMQ 为微服务架构下的消息驱动应用提供了强大的支持,通过深入理解和合理运用其特性,可以构建出健壮、高效的分布式系统。