RocketMQ API使用详解与最佳实践
RocketMQ API 基础概念
RocketMQ 是一款分布式消息中间件,具有高吞吐量、高可用性、适合大规模分布式系统等特点。在深入探讨其 API 使用之前,先了解一些关键概念。
- Producer:消息生产者,负责发送消息到 RocketMQ 服务器。生产者可以是应用系统中的业务逻辑模块,例如电商系统中的订单创建模块,当一个新订单生成时,该模块作为生产者向 RocketMQ 发送订单相关消息。
- Consumer:消息消费者,从 RocketMQ 服务器接收消息并进行相应处理。比如在电商系统中,库存管理模块可以作为消费者,接收订单创建消息后进行库存扣减等操作。
- Topic:主题,是消息的逻辑分类。不同类型的消息可以发送到不同的 Topic,例如电商系统中,订单相关消息可以发送到 “order - topic”,物流相关消息可以发送到 “logistics - topic”。
- Queue:队列,是 Topic 的物理分区。一个 Topic 可以包含多个 Queue,通过设置多个 Queue 可以提高消息的并行处理能力。例如,“order - topic” 可以设置 4 个 Queue,当有大量订单消息时,不同的消费者可以并行从不同的 Queue 中拉取消息进行处理。
环境搭建
在使用 RocketMQ API 之前,需要搭建 RocketMQ 运行环境。这里以 Linux 环境为例进行说明。
- 下载 RocketMQ:从 RocketMQ 官方网站下载最新版本的安装包,例如 rocketmq - all - 4.9.4 - bin - release.zip。
- 解压安装包:使用命令
unzip rocketmq - all - 4.9.4 - bin - release.zip
解压安装包到指定目录。 - 启动 NameServer:进入 RocketMQ 解压目录,执行
nohup sh bin/mqnamesrv &
启动 NameServer。NameServer 是 RocketMQ 的名称服务,负责保存 Topic 和 Broker 的映射关系。 - 启动 Broker:在启动 NameServer 后,执行
nohup sh bin/mqbroker - n localhost:9876 &
启动 Broker。Broker 负责存储和转发消息。
发送消息
同步发送
同步发送是指消息发送后,生产者会等待 RocketMQ 服务器的响应,只有收到响应后才会继续执行后续代码。这种方式适用于对消息发送可靠性要求较高的场景,例如订单创建消息,必须确保消息成功发送到 RocketMQ 服务器。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息实例,指定 Topic、Tag 和消息体
Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF - 8"));
// 同步发送消息,等待服务器响应
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
在上述代码中,首先创建了一个 DefaultMQProducer
实例,并指定生产者组为 “group1”。然后设置 NameServer 地址,启动生产者。在循环中,创建消息并同步发送,最后打印发送结果。发送完成后,关闭生产者。
异步发送
异步发送是指消息发送后,生产者不会等待服务器响应,而是继续执行后续代码。当服务器响应到达时,通过回调函数进行处理。这种方式适用于对消息发送性能要求较高,且允许一定概率消息发送失败的场景,例如日志消息的发送。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group2");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
Message message = new Message("TopicTest", "TagB", ("Hello RocketMQ Async " + i).getBytes("UTF - 8"));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%d Send message success: %s%n", index, sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%d Send message failed: %s%n", index, e);
}
});
}
Thread.sleep(10000);
producer.shutdown();
}
}
在这段代码中,同样创建了生产者实例并启动。在发送消息时,通过 producer.send(message, new SendCallback())
方法传入回调函数。当消息发送成功时,onSuccess
方法会被调用;当发送失败时,onException
方法会被调用。最后通过 Thread.sleep(10000)
等待一段时间,确保异步回调函数有足够时间执行,然后关闭生产者。
单向发送
单向发送是指消息发送后,生产者既不等待服务器响应,也不通过回调函数处理响应。这种方式适用于对消息可靠性要求较低,且对性能要求极高的场景,例如一些监控数据的上报。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group3");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", "TagC", ("Hello RocketMQ Oneway " + i).getBytes("UTF - 8"));
producer.sendOneway(message);
}
producer.shutdown();
}
}
此代码创建生产者并启动后,在循环中使用 producer.sendOneway(message)
方法单向发送消息,发送完成后关闭生产者。由于不关心发送结果,这种方式性能最高,但可能会丢失消息。
接收消息
推模式(PushConsumer)
推模式下,RocketMQ 服务器会主动将消息推送给消费者。消费者通过注册监听器的方式来处理接收到的消息。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class PushConsumer {
public static void main(String[] args) throws Exception {
// 创建一个消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group4");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅 Topic 和 Tag
consumer.subscribe("TopicTest", "TagA || TagB");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
在上述代码中,创建了 DefaultMQPushConsumer
实例,并指定消费者组为 “group4”。设置 NameServer 地址后,通过 consumer.subscribe("TopicTest", "TagA || TagB")
订阅 “TopicTest” 主题下的 “TagA” 和 “TagB” 消息。接着注册消息监听器,在监听器的 consumeMessage
方法中处理接收到的消息。最后启动消费者。
拉模式(PullConsumer)
拉模式下,消费者主动从 RocketMQ 服务器拉取消息。这种方式可以让消费者更灵活地控制消息拉取的时机和频率。
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
import java.util.Set;
public class PullConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group5");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
long offset = consumer.fetchConsumeOffset(mq, true);
while (true) {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgList = pullResult.getMsgFoundList();
for (MessageExt msg : msgList) {
System.out.printf("Receive New Messages: %s %n", msg);
}
offset = pullResult.getNextBeginOffset();
break;
case NO_NEW_MSG:
break;
case NO_MATCHED_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
}
consumer.shutdown();
}
}
代码中创建 DefaultMQPullConsumer
实例并启动。通过 consumer.fetchSubscribeMessageQueues("TopicTest")
获取 “TopicTest” 主题的所有队列。然后在每个队列上通过 consumer.pullBlockIfNotFound
方法拉取消息,根据拉取结果进行相应处理,最后关闭消费者。
高级特性与最佳实践
消息顺序性
在某些场景下,例如订单处理流程,消息的顺序至关重要。RocketMQ 支持局部顺序消息,即保证在一个队列内消息的顺序性。
- 顺序发送:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group6");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<MessageQueue> mqs = producer.fetchPublishMessageQueues("OrderTopic");
int queueIndex = 0;
for (int i = 0; i < 10; i++) {
String orderId = "Order_001";
int queue = Math.abs(orderId.hashCode() % mqs.size());
Message message = new Message("OrderTopic", ("Order Message " + i).getBytes("UTF - 8"));
SendResult sendResult = producer.send(message, mqs.get(queue));
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
在上述代码中,通过 producer.fetchPublishMessageQueues("OrderTopic")
获取 “OrderTopic” 的队列列表。根据订单 ID 的哈希值选择队列,确保相同订单 ID 的消息发送到同一个队列,从而保证顺序。
- 顺序消费:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group7");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
在消费者端,通过注册 MessageListenerOrderly
监听器,RocketMQ 会保证在一个队列内消息按顺序消费。
消息过滤
RocketMQ 支持通过 Tag 和 SQL92 表达式进行消息过滤。
- Tag 过滤:在前面的推模式和拉模式示例中,已经展示了通过 Tag 进行消息过滤,例如
consumer.subscribe("TopicTest", "TagA || TagB")
,只接收 “TopicTest” 主题下 “TagA” 和 “TagB” 的消息。 - SQL92 过滤: 首先在生产者发送消息时设置属性:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SqlProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group8");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("SqlTopic", ("SQL Message " + i).getBytes("UTF - 8"));
message.putUserProperty("age", String.valueOf(i));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
然后在消费者端使用 SQL92 表达式过滤:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SqlConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group9");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("SqlTopic", "age > 5");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
在上述代码中,生产者在消息中设置了 “age” 属性,消费者通过 consumer.subscribe("SqlTopic", "age > 5")
过滤出 “age” 属性大于 5 的消息。
事务消息
在一些分布式事务场景中,需要保证消息发送与业务操作的原子性。RocketMQ 提供了事务消息功能。
- 事务生产者:
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group10");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地业务逻辑
System.out.println("Execute local transaction: " + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
System.out.println("Check local transaction: " + new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message message = new Message("TransactionTopic", ("Transaction Message").getBytes("UTF - 8"));
SendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在上述代码中,创建 TransactionMQProducer
实例,并设置事务监听器。在 executeLocalTransaction
方法中执行本地业务逻辑,根据业务执行结果返回 LocalTransactionState.COMMIT_MESSAGE
(提交消息)、LocalTransactionState.ROLLBACK_MESSAGE
(回滚消息)或 LocalTransactionState.UNKNOW
(未知状态,RocketMQ 会进行事务状态检查)。在 checkLocalTransaction
方法中检查本地事务状态。
- 事务消费者: 事务消费者与普通消费者类似,只是处理事务消息时,需要确保幂等性,防止重复消费。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group11");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
在消费者端,通过正常的消息监听器处理事务消息,在处理业务逻辑时,通过检查业务状态等方式确保幂等性。
高可用性与负载均衡
- NameServer 高可用性:NameServer 本身是无状态的,可以部署多个 NameServer 实例,生产者和消费者通过配置多个 NameServer 地址来实现高可用性,例如
producer.setNamesrvAddr("localhost:9876;localhost:9877")
。 - Broker 高可用性:RocketMQ 支持 Master - Slave 架构,Master 负责处理读写请求,Slave 从 Master 同步数据。当 Master 出现故障时,Slave 可以切换为 Master 继续提供服务。
- 负载均衡:
- 生产者负载均衡:生产者在发送消息时,会根据负载均衡策略选择队列。默认的负载均衡策略是轮询,即依次选择队列发送消息。可以通过实现
MessageQueueSelector
接口来自定义负载均衡策略。 - 消费者负载均衡:消费者组内的消费者会自动进行负载均衡,分配不同的队列给不同的消费者进行消费。如果新增或减少消费者,RocketMQ 会自动重新分配队列。
- 生产者负载均衡:生产者在发送消息时,会根据负载均衡策略选择队列。默认的负载均衡策略是轮询,即依次选择队列发送消息。可以通过实现
性能优化
- 批量发送消息:生产者可以批量发送消息,减少网络开销。例如:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group12");
producer.setNamesrvAddr("localhost:9876");
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message("BatchTopic", ("Batch Message " + i).getBytes("UTF - 8"));
messages.add(message);
}
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在上述代码中,创建了一个消息列表,将多个消息添加到列表中,然后通过 producer.send(messages)
批量发送消息。
- 合理设置线程池:消费者在处理消息时,可以根据业务需求合理设置线程池大小。例如,在
DefaultMQPushConsumer
中,可以通过consumer.setConsumeThreadMin(int)
和consumer.setConsumeThreadMax(int)
方法设置最小和最大消费线程数。 - 优化消息存储:可以通过调整 RocketMQ 的存储配置,如刷盘策略(同步刷盘、异步刷盘)、文件存储路径等,来提高消息存储性能。同步刷盘保证数据的可靠性,但性能相对较低;异步刷盘性能较高,但可能会丢失少量数据。
常见问题与解决方法
- 消息发送失败:
- 原因:可能是 NameServer 地址配置错误、网络问题、Topic 不存在等。
- 解决方法:检查 NameServer 地址是否正确,确保网络连接正常,通过 RocketMQ 控制台或命令行工具检查 Topic 是否存在。
- 消息消费失败:
- 原因:可能是消费逻辑异常、消息格式错误等。
- 解决方法:在消费监听器中捕获异常,打印详细的错误信息,检查消息格式是否符合业务要求。
- 队列负载不均衡:
- 原因:可能是消费者数量与队列数量不匹配、消费者负载能力差异较大等。
- 解决方法:根据业务流量合理调整消费者数量和队列数量,确保消费者负载能力相近。可以通过监控工具查看队列的消费进度和消费者的负载情况,进行相应调整。
通过深入理解和合理运用 RocketMQ 的 API,结合上述最佳实践和性能优化方法,可以构建出高效、可靠的分布式消息系统,满足各种复杂业务场景的需求。在实际应用中,还需要根据具体业务需求和系统环境进行适当的调整和优化。