RocketMQ的客户端API使用指南
环境准备
在开始使用 RocketMQ 客户端 API 之前,首先要确保开发环境已经配置好相关依赖。以 Maven 项目为例,在 pom.xml
文件中添加 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
注意,版本号可以根据实际情况进行调整。同时,需要确保 RocketMQ 服务端已经正确安装并启动,服务端相关的配置(如 NameServer 地址等)要准确无误,这是客户端能够正常连接服务端的基础。
生产者(Producer)
同步发送消息
同步发送是指消息发送出去后,等待服务端的响应,在收到响应之前,发送线程会一直阻塞。这种方式可靠性高,适用于对消息发送结果有较高要求的场景,比如重要的业务通知等。 以下是同步发送消息的代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息实例,指定主题、标签和消息体
Message msg = new Message("sync_topic" /* 主题 */,
"sync_tag" /* 标签 */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息体 */);
// 同步发送消息,获取发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
在上述代码中,首先创建了一个 DefaultMQProducer
实例,并设置了生产者组名和 NameServer 地址。然后启动生产者,通过循环创建消息并同步发送。最后,发送完成后关闭生产者。
异步发送消息
异步发送是指消息发送出去后,不等服务端响应,发送线程就继续执行后续代码。服务端响应通过回调函数来处理。这种方式适用于对响应时间敏感,且对消息发送结果可以异步处理的场景,比如日志记录等。 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
Message msg = new Message("async_topic",
"async_tag",
("Hello RocketMQ Async " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%d 异步发送成功, result: %s%n", index, sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%d 异步发送失败, exception: %s%n", index, e);
}
});
}
Thread.sleep(1000 * 5);
producer.shutdown();
}
}
在这段代码中,创建生产者和消息的过程与同步发送类似。不同之处在于调用 producer.send
方法时,传入了一个 SendCallback
回调对象。在回调对象的 onSuccess
方法中处理发送成功的逻辑,onException
方法中处理发送异常的情况。由于异步发送,主线程不能立即退出,所以添加了 Thread.sleep
让主线程等待一段时间,确保异步发送操作完成后再关闭生产者。
单向发送消息
单向发送是指只负责将消息发送出去,不关心服务端的响应。这种方式性能最高,但可靠性相对较低,适用于对消息可靠性要求不高,只追求发送速度的场景,比如一些监控数据的上报等。 代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message("oneway_topic",
"oneway_tag",
("Hello RocketMQ Oneway " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
}
producer.shutdown();
}
}
代码相对简单,通过 producer.sendOneway
方法发送消息,不处理任何返回结果。在实际应用中,要充分考虑单向发送消息可能丢失的风险。
消费者(Consumer)
推模式(Push Consumer)
推模式下,RocketMQ 服务端会主动将消息推送给消费者。消费者只需注册一个监听器,当有消息到达时,监听器的 consumeMessage
方法会被调用。
代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class PushConsumer {
public static void main(String[] args) throws Exception {
// 创建一个推模式消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer_group");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和标签
consumer.subscribe("push_topic", "push_tag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
}
// 返回消费成功状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在上述代码中,首先创建 DefaultMQPushConsumer
实例并设置消费者组名和 NameServer 地址。然后通过 subscribe
方法订阅主题和标签。接着注册 MessageListenerConcurrently
监听器,在 consumeMessage
方法中处理接收到的消息。最后启动消费者。
拉模式(Pull Consumer)
拉模式下,消费者主动从 RocketMQ 服务端拉取消息。这种方式相比推模式,消费者对消息获取的时机有更多的控制权。 代码示例如下:
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
public class PullConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("pull_topic");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
long offset = consumer.fetchConsumeOffset(mq, true);
while (true) {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, "pull_tag", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgList = pullResult.getMsgFoundList();
for (MessageExt msg : msgList) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
}
offset = pullResult.getNextBeginOffset();
break;
case NO_NEW_MSG:
System.out.println("没有新消息");
break;
case NO_MATCHED_MSG:
System.out.println("没有匹配的消息");
break;
case OFFSET_ILLEGAL:
System.out.println("偏移量非法");
break;
default:
break;
}
}
}
consumer.shutdown();
}
}
在这段代码中,首先创建 DefaultMQPullConsumer
实例并设置相关参数。通过 fetchSubscribeMessageQueues
获取订阅主题的消息队列,然后针对每个消息队列,先获取消费偏移量,再通过循环调用 pullBlockIfNotFound
方法拉取消息。根据拉取结果进行不同的处理,同时更新偏移量。
消息属性
自定义属性设置与获取
RocketMQ 允许为消息设置自定义属性,这些属性可以在消息的生产和消费过程中起到辅助作用。比如,可以通过设置属性来标记消息的优先级、业务类型等。 在生产者端设置属性的代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerWithProperties {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_with_properties_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("properties_topic",
"properties_tag",
("Hello RocketMQ with properties").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置自定义属性
msg.putUserProperty("priority", "high");
msg.putUserProperty("bizType", "order");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在消费者端获取属性的代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerWithProperties {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_properties_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("properties_topic", "properties_tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
// 获取自定义属性
String priority = msg.getUserProperty("priority");
String bizType = msg.getUserProperty("bizType");
System.out.printf("Priority: %s, BizType: %s%n", priority, bizType);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在生产者端通过 msg.putUserProperty
方法设置属性,在消费者端通过 msg.getUserProperty
方法获取属性。
系统属性介绍
除了自定义属性,RocketMQ 消息还有一些系统属性。比如 MSG_KEY
,可以为消息设置业务键,方便在消息查询等场景中使用。在生产者端设置 MSG_KEY
的示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerWithSystemProperty {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_with_system_property_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("system_property_topic",
"system_property_tag",
("Hello RocketMQ with system property").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置 MSG_KEY
msg.setKeys("order_123");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在消费者端获取 MSG_KEY
的示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerWithSystemProperty {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_system_property_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("system_property_topic", "system_property_tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
// 获取 MSG_KEY
String keys = msg.getKeys();
System.out.printf("Keys: %s%n", keys);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
通过合理使用系统属性和自定义属性,可以增强消息在业务场景中的处理能力和可追溯性。
事务消息
事务消息原理
事务消息是 RocketMQ 提供的一种特殊消息类型,用于确保消息发送与本地事务的原子性。其原理是将消息发送过程分为两个阶段:Prepared 阶段和 Commit/Rollback 阶段。在 Prepared 阶段,生产者向 RocketMQ 服务端发送一条半消息(Half Message),服务端接收后返回确认。然后生产者执行本地事务,根据事务执行结果向服务端发送 Commit 或 Rollback 指令。如果服务端长时间未收到 Commit 或 Rollback 指令,会主动回调生产者的 checkLocalTransaction
方法来检查本地事务状态。
事务消息代码实现
以下是事务消息生产者的代码示例:
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
System.out.println("执行本地事务");
// 这里模拟本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
System.out.println("检查本地事务状态");
// 这里模拟本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("transaction_topic",
"transaction_tag",
("Hello RocketMQ Transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
}
在上述代码中,创建 TransactionMQProducer
实例,并设置 TransactionListener
。在 executeLocalTransaction
方法中执行本地事务逻辑,返回 LocalTransactionState.COMMIT_MESSAGE
表示本地事务成功提交,LocalTransactionState.ROLLBACK_MESSAGE
表示回滚,LocalTransactionState.UNKNOW
表示状态未知。checkLocalTransaction
方法用于服务端回调检查本地事务状态。
事务消息消费者的代码与普通消费者类似,这里不再赘述。事务消息在一些涉及到分布式事务的场景中非常有用,比如电商系统中订单创建与库存扣减的原子性操作。
消息顺序性
顺序消息原理
RocketMQ 支持顺序消息,分为全局顺序和分区顺序。全局顺序是指在整个 Topic 范围内,消息严格按照发送顺序进行消费。分区顺序是指在一个消息队列内,消息按照发送顺序消费。实现顺序消息的关键在于将相关消息发送到同一个消息队列,消费者从该队列顺序消费。
顺序消息代码实现
顺序消息生产者代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"ordered_tag1", "ordered_tag2", "ordered_tag3"};
for (int i = 0; i < 10; i++) {
int orderId = i % 3;
Message msg = new Message("ordered_topic",
tags[orderId],
("Hello RocketMQ Ordered " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
在上述代码中,通过 MessageQueueSelector
实现将具有相同 orderId
的消息发送到同一个消息队列。
顺序消息消费者代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("ordered_topic", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在消费者端,注册 MessageListenerOrderly
监听器来顺序消费消息。通过这种方式,可以保证在同一个消息队列内消息的顺序性。
消息过滤
基于 Tag 过滤
RocketMQ 支持基于 Tag 进行消息过滤,这是一种简单有效的过滤方式。在生产者发送消息时指定 Tag,消费者订阅时也指定相应的 Tag,只有 Tag 匹配的消息才会被消费者接收。 生产者代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerWithTagFilter {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_with_tag_filter_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg1 = new Message("filter_topic",
"tag1",
("Hello RocketMQ with tag1").getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg2 = new Message("filter_topic",
"tag2",
("Hello RocketMQ with tag2").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2);
System.out.printf("%s%n", sendResult1);
System.out.printf("%s%n", sendResult2);
producer.shutdown();
}
}
消费者代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerWithTagFilter {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_tag_filter_group");
consumer.setNamesrvAddr("localhost:9876");
// 只订阅 tag1 的消息
consumer.subscribe("filter_topic", "tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在上述代码中,消费者通过 subscribe
方法指定只接收 filter_topic
主题下 tag1
的消息。
基于 SQL92 表达式过滤
RocketMQ 还支持基于 SQL92 表达式进行消息过滤,这种方式更加灵活,可以根据消息的属性进行复杂的过滤。不过需要注意的是,使用 SQL92 过滤需要在 Broker 配置文件中开启相关功能。 生产者代码示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerWithSqlFilter {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_with_sql_filter_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg1 = new Message("sql_filter_topic",
"sql_tag",
("Hello RocketMQ with sql filter 1").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg1.putUserProperty("age", "20");
Message msg2 = new Message("sql_filter_topic",
"sql_tag",
("Hello RocketMQ with sql filter 2").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg2.putUserProperty("age", "30");
SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2);
System.out.printf("%s%n", sendResult1);
System.out.printf("%s%n", sendResult2);
producer.shutdown();
}
}
消费者代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerWithSqlFilter {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_sql_filter_group");
consumer.setNamesrvAddr("localhost:9876");
// 使用 SQL92 表达式过滤 age > 25 的消息
consumer.subscribe("sql_filter_topic", MessageSelector.bySql("age > 25"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在上述代码中,生产者为消息设置 age
属性,消费者通过 MessageSelector.bySql
方法使用 SQL92 表达式过滤符合条件的消息。通过这种方式,可以实现更加精细化的消息过滤。
通过以上对 RocketMQ 客户端 API 各个方面的详细介绍和代码示例,开发者可以深入理解并灵活运用 RocketMQ 进行后端开发中的消息队列相关功能实现。无论是简单的消息发送与接收,还是复杂的事务消息、顺序消息、消息过滤等场景,RocketMQ 都提供了强大且易用的 API 支持。