Spring Cloud RocketMQ 的消息驱动应用
Spring Cloud RocketMQ 基础概述
Spring Cloud RocketMQ 简介
Spring Cloud RocketMQ 是基于 Spring Cloud 体系对 RocketMQ 的封装与集成,旨在让 Spring Cloud 应用能够便捷地使用 RocketMQ 的消息队列功能。RocketMQ 作为一款高性能、高可靠的分布式消息队列,在大数据、高并发系统中有着广泛应用。而 Spring Cloud 提供了微服务架构下的一系列解决方案,两者结合,为微服务之间的消息通信提供了强大的支持。
Spring Cloud RocketMQ 遵循 Spring Cloud 规范,以约定大于配置的方式,简化了 RocketMQ 在 Spring Cloud 项目中的使用。它提供了统一的消息发送和接收接口,使得开发人员无需深入了解 RocketMQ 的底层细节,就能快速实现消息驱动的应用。
为何选择 Spring Cloud RocketMQ
在微服务架构中,各服务之间的通信至关重要。传统的同步调用方式在高并发场景下会导致性能瓶颈,而消息队列作为一种异步解耦的通信方式,能有效提升系统的整体性能和可扩展性。
RocketMQ 本身具备高吞吐量、低延迟、高可用性等特性。它支持顺序消息、事务消息等复杂场景,在金融、电商等领域有着成熟的应用案例。结合 Spring Cloud 后,开发人员可以利用 Spring Cloud 的服务注册与发现、配置管理等功能,与 RocketMQ 消息通信功能深度融合,构建出更加健壮、灵活的微服务架构。
例如,在电商系统中,订单服务创建订单后,可以通过 Spring Cloud RocketMQ 发送消息给库存服务和物流服务,实现库存扣减和物流单创建。这种异步解耦的方式,使得各个服务可以独立部署、扩展,提高了系统的整体可靠性。
环境搭建
安装 RocketMQ
- 下载 RocketMQ:从 RocketMQ 官方网站(https://rocketmq.apache.org/)下载适合的版本,当前最新稳定版本为 5.1.0 。解压下载的压缩包,例如解压到
/opt/rocketmq
目录。 - 配置环境变量:在
~/.bashrc
文件中添加以下内容:
export ROCKETMQ_HOME=/opt/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
然后执行 source ~/.bashrc
使配置生效。
3. 启动 NameServer:在终端执行 mqnamesrv
命令,启动 NameServer。NameServer 是 RocketMQ 的路由中心,负责存储 Topic 与 Broker 的映射关系。
4. 启动 Broker:在启动 Broker 之前,需要根据实际情况修改 conf/broker.conf
文件。例如,配置 Broker 的 IP 地址和端口:
brokerIP1 = 192.168.1.100
listenPort = 10911
然后执行 mqbroker -n 192.168.1.100:9876 -c /opt/rocketmq/conf/broker.conf
命令启动 Broker。其中,192.168.1.100:9876
是 NameServer 的地址和端口。
引入 Spring Cloud RocketMQ 依赖
- Maven 项目:在
pom.xml
文件中添加以下依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-rocketmq</artifactId>
</dependency>
- Gradle 项目:在
build.gradle
文件中添加以下依赖:
implementation 'org.springframework.cloud:spring-cloud-starter-rocketmq'
同时,确保项目中已经引入了 Spring Cloud 相关的基础依赖,例如 spring-cloud-starter-bootstrap
等,以支持 Spring Cloud 的配置管理等功能。
配置 Spring Cloud RocketMQ
在 application.properties
或 application.yml
文件中配置 RocketMQ 的相关参数。以 application.yml
为例:
spring:
rocketmq:
name-server: 192.168.1.100:9876
producer:
group: my-producer-group
send-message-timeout: 3000
consumer:
group: my-consumer-group
subscribe:
- topic: my-topic
tag: *
上述配置中,name-server
配置了 RocketMQ NameServer 的地址;producer.group
配置了生产者组名称;producer.send-message-timeout
配置了发送消息的超时时间;consumer.group
配置了消费者组名称;consumer.subscribe
配置了消费者订阅的 Topic 和 Tag。
消息生产者
创建消息生产者
在 Spring Boot 应用中,通过创建 RocketMQTemplate
来发送消息。首先,创建一个配置类 RocketMQConfig
:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate() {
return new RocketMQTemplate();
}
}
然后,在服务类中注入 RocketMQTemplate
并发送消息:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
}
在上述代码中,RocketMQTemplate
的 convertAndSend
方法将消息发送到指定的 Topic。
发送不同类型消息
- 普通消息:上述示例发送的就是普通消息,直接将消息内容作为参数传入
convertAndSend
方法即可。 - 顺序消息:发送顺序消息时,需要指定消息的分区键(Sharding Key),以确保相同分区键的消息被发送到同一个队列,从而保证顺序。
public void sendOrderlyMessage(String topic, String message, String shardingKey) {
rocketMQTemplate.syncSendOrderly(topic, message, shardingKey);
}
- 事务消息:事务消息用于解决分布式事务问题。首先,创建一个事务监听器
TransactionListenerImpl
:
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {
// 执行本地事务
try {
// 业务逻辑,例如数据库操作
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
return RocketMQLocalTransactionState.COMMIT;
}
}
然后,在生产者中发送事务消息:
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class TransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage(String topic, String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.sendMessageInTransaction(topic, msg, null);
}
}
在上述代码中,sendMessageInTransaction
方法发送事务消息,事务监听器 TransactionListenerImpl
负责执行和检查本地事务。
消息消费者
创建消息消费者
通过 @RocketMQMessageListener
注解来创建消息消费者。首先,创建一个消费者类 MessageConsumer
:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group")
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
}
}
在上述代码中,@RocketMQMessageListener
注解指定了消费者订阅的 Topic 和消费者组,RocketMQListener
接口的 onMessage
方法用于处理接收到的消息。
消费模式
- 集群消费:默认情况下,Spring Cloud RocketMQ 使用集群消费模式。在集群消费模式下,同一个消费者组内的多个消费者实例会分摊消费消息,提高消费效率。例如,有 3 个消费者实例属于同一个消费者组,订阅了一个有 3 个队列的 Topic,那么每个消费者实例会负责消费一个队列的消息。
- 广播消费:如果需要广播消费,可以在
@RocketMQMessageListener
注解中设置messageModel = MessageModel.BROADCASTING
。在广播消费模式下,每个消费者实例都会接收到 Topic 中的所有消息,适用于需要所有消费者都处理相同消息的场景,如配置更新通知等。
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group", messageModel = MessageModel.BROADCASTING)
public class BroadcastConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received broadcast message: " + message);
// 处理消息的业务逻辑
}
}
消息过滤
- Tag 过滤:在配置消费者时,可以通过设置
tag
进行简单的消息过滤。例如:
spring:
rocketmq:
consumer:
subscribe:
- topic: my-topic
tag: tag1 || tag2
上述配置表示消费者只接收 my-topic
中 Tag 为 tag1
或 tag2
的消息。
2. SQL92 过滤:RocketMQ 支持基于 SQL92 语法的消息过滤。首先,在生产者发送消息时,需要设置消息的属性:
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SqlFilterProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendSqlFilterMessage() {
Message message = new Message("my-topic", "tag1", "message content".getBytes());
message.putUserProperty("age", "25");
rocketMQTemplate.getProducer().send(message);
}
}
然后,在消费者配置中设置 SQL 过滤表达式:
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group", selectorExpression = "age > 20")
public class SqlFilterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received filtered message: " + message);
// 处理消息的业务逻辑
}
}
上述配置表示消费者只接收 my-topic
中 age
属性大于 20 的消息。
高级特性与优化
消息重试机制
- 生产者重试:当生产者发送消息失败时,RocketMQ 会自动进行重试。默认情况下,生产者会重试 2 次。可以通过
spring.rocketmq.producer.retry-times-when-send-failed
属性来配置重试次数。例如:
spring:
rocketmq:
producer:
retry-times-when-send-failed: 3
- 消费者重试:当消费者消费消息失败时,RocketMQ 也会进行重试。默认情况下,普通消息会重试 16 次,顺序消息会一直重试。可以通过
spring.rocketmq.consumer.max-retry-times
属性来配置普通消息的最大重试次数。例如:
spring:
rocketmq:
consumer:
max-retry-times: 5
当重试次数达到上限后,消息会被发送到死信队列(DLQ)。可以通过 spring.rocketmq.consumer.dlq-topic
属性来配置死信队列的 Topic。例如:
spring:
rocketmq:
consumer:
dlq-topic: my-dlq-topic
消息幂等性
在分布式系统中,消息可能会因为网络问题等原因被重复消费。为了保证消息处理的幂等性,可以在消费者处理消息时,通过数据库唯一索引、Redis 等方式进行去重。
以数据库唯一索引为例,假设消息内容是订单信息,包含订单号 orderId
。在处理订单消息时,首先尝试将订单信息插入到数据库中,数据库表对 orderId
字段设置唯一索引。如果插入成功,表示该订单是第一次处理;如果插入失败,捕获唯一约束异常,说明该订单已经处理过,直接返回成功即可。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.transaction.Transactional;
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group")
public class OrderConsumer implements RocketMQListener<Order> {
@PersistenceContext
private EntityManager entityManager;
@Override
@Transactional
public void onMessage(Order order) {
try {
entityManager.persist(order);
} catch (Exception e) {
// 处理唯一约束异常,说明订单已处理过
}
}
}
性能优化
- 批量发送消息:生产者可以批量发送消息,以减少网络开销,提高发送性能。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class BatchProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendBatchMessage(String topic) {
List<String> messages = new ArrayList<>();
messages.add("message1");
messages.add("message2");
messages.add("message3");
rocketMQTemplate.send(topic, messages);
}
}
- 异步消费:消费者可以采用异步消费的方式,提高消费效率。在
@RocketMQMessageListener
注解中设置consumeMode = ConsumeMode.ASYNC_SUBSCRIBE
。
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-consumer-group", consumeMode = ConsumeMode.ASYNC_SUBSCRIBE)
public class AsyncConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 异步处理消息
new Thread(() -> {
// 业务逻辑
}).start();
}
}
- 调整线程池参数:RocketMQ 的生产者和消费者都使用线程池来处理消息。可以通过配置线程池参数来优化性能。例如,在消费者配置中,可以设置
spring.rocketmq.consumer.consume-thread-min
和spring.rocketmq.consumer.consume-thread-max
来调整消费线程池的最小和最大线程数。
spring:
rocketmq:
consumer:
consume-thread-min: 10
consume-thread-max: 20
与其他 Spring Cloud 组件集成
与 Eureka 集成
- 注册生产者和消费者服务:如果项目中使用 Eureka 作为服务注册与发现组件,生产者和消费者服务都需要注册到 Eureka 服务器。在
pom.xml
文件中添加 Eureka 客户端依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
在 application.yml
文件中配置 Eureka 客户端:
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka/
然后,在 Spring Boot 启动类上添加 @EnableEurekaClient
注解。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
public class MessageApp {
public static void main(String[] args) {
SpringApplication.run(MessageApp.class, args);
}
}
这样,生产者和消费者服务就可以在 Eureka 服务器上注册,方便其他服务发现和调用。
与 Config Server 集成
- 配置共享:Spring Cloud Config Server 用于集中管理微服务的配置。可以将 Spring Cloud RocketMQ 的配置,如 NameServer 地址、生产者和消费者组等,统一存储在 Config Server 中。
首先,在 Config Server 项目的
application.yml
文件中添加配置文件的存储路径和格式等相关配置:
spring:
cloud:
config:
server:
git:
uri: https://github.com/your-repo/config-repo
search-paths: config-repo
username: your-username
password: your-password
label: master
然后,在生产者和消费者项目的 bootstrap.yml
文件中配置 Config Server 的地址:
spring:
application:
name: message-service
cloud:
config:
uri: http://localhost:8888
fail-fast: true
retry:
initial-interval: 1000
multiplier: 1.5
max-interval: 10000
max-attempts: 5
这样,生产者和消费者服务就可以从 Config Server 中获取 RocketMQ 的配置信息,方便统一管理和修改。
与 Zipkin 集成
- 链路追踪:Zipkin 是一个分布式链路追踪系统,可以帮助我们分析消息在各个微服务之间的传递过程。在生产者和消费者项目中添加 Zipkin 依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
在 application.yml
文件中配置 Zipkin 服务器的地址:
spring:
sleuth:
sampler:
probability: 1.0
zipkin:
base-url: http://localhost:9411
通过上述配置,Spring Cloud RocketMQ 发送和接收消息的过程会被 Zipkin 追踪,在 Zipkin 的 UI 界面中可以查看消息的链路信息,包括消息的发送时间、接收时间、经过的服务等,有助于排查系统中的性能问题和故障。