MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ的客户端API使用指南

2022-06-256.3k 阅读

环境准备

在开始使用 RocketMQ 客户端 API 之前,首先要确保开发环境已经配置好相关依赖。以 Maven 项目为例,在 pom.xml 文件中添加 RocketMQ 客户端依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

注意,版本号可以根据实际情况进行调整。同时,需要确保 RocketMQ 服务端已经正确安装并启动,服务端相关的配置(如 NameServer 地址等)要准确无误,这是客户端能够正常连接服务端的基础。

生产者(Producer)

同步发送消息

同步发送是指消息发送出去后,等待服务端的响应,在收到响应之前,发送线程会一直阻塞。这种方式可靠性高,适用于对消息发送结果有较高要求的场景,比如重要的业务通知等。 以下是同步发送消息的代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个生产者实例,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例,指定主题、标签和消息体
            Message msg = new Message("sync_topic" /* 主题 */,
                    "sync_tag" /* 标签 */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息体 */);
            // 同步发送消息,获取发送结果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

在上述代码中,首先创建了一个 DefaultMQProducer 实例,并设置了生产者组名和 NameServer 地址。然后启动生产者,通过循环创建消息并同步发送。最后,发送完成后关闭生产者。

异步发送消息

异步发送是指消息发送出去后,不等服务端响应,发送线程就继续执行后续代码。服务端响应通过回调函数来处理。这种方式适用于对响应时间敏感,且对消息发送结果可以异步处理的场景,比如日志记录等。 代码示例如下:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            Message msg = new Message("async_topic",
                    "async_tag",
                    ("Hello RocketMQ Async " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%d 异步发送成功, result: %s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%d 异步发送失败, exception: %s%n", index, e);
                }
            });
        }
        Thread.sleep(1000 * 5);
        producer.shutdown();
    }
}

在这段代码中,创建生产者和消息的过程与同步发送类似。不同之处在于调用 producer.send 方法时,传入了一个 SendCallback 回调对象。在回调对象的 onSuccess 方法中处理发送成功的逻辑,onException 方法中处理发送异常的情况。由于异步发送,主线程不能立即退出,所以添加了 Thread.sleep 让主线程等待一段时间,确保异步发送操作完成后再关闭生产者。

单向发送消息

单向发送是指只负责将消息发送出去,不关心服务端的响应。这种方式性能最高,但可靠性相对较低,适用于对消息可靠性要求不高,只追求发送速度的场景,比如一些监控数据的上报等。 代码示例如下:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("oneway_topic",
                    "oneway_tag",
                    ("Hello RocketMQ Oneway " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.sendOneway(msg);
        }
        producer.shutdown();
    }
}

代码相对简单,通过 producer.sendOneway 方法发送消息,不处理任何返回结果。在实际应用中,要充分考虑单向发送消息可能丢失的风险。

消费者(Consumer)

推模式(Push Consumer)

推模式下,RocketMQ 服务端会主动将消息推送给消费者。消费者只需注册一个监听器,当有消息到达时,监听器的 consumeMessage 方法会被调用。 代码示例如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PushConsumer {
    public static void main(String[] args) throws Exception {
        // 创建一个推模式消费者实例,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer_group");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("push_topic", "push_tag");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                // 返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在上述代码中,首先创建 DefaultMQPushConsumer 实例并设置消费者组名和 NameServer 地址。然后通过 subscribe 方法订阅主题和标签。接着注册 MessageListenerConcurrently 监听器,在 consumeMessage 方法中处理接收到的消息。最后启动消费者。

拉模式(Pull Consumer)

拉模式下,消费者主动从 RocketMQ 服务端拉取消息。这种方式相比推模式,消费者对消息获取的时机有更多的控制权。 代码示例如下:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;
import java.util.Set;

public class PullConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("pull_topic");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            long offset = consumer.fetchConsumeOffset(mq, true);
            while (true) {
                PullResult pullResult = consumer.pullBlockIfNotFound(mq, "pull_tag", offset, 32);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> msgList = pullResult.getMsgFoundList();
                        for (MessageExt msg : msgList) {
                            System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                        }
                        offset = pullResult.getNextBeginOffset();
                        break;
                    case NO_NEW_MSG:
                        System.out.println("没有新消息");
                        break;
                    case NO_MATCHED_MSG:
                        System.out.println("没有匹配的消息");
                        break;
                    case OFFSET_ILLEGAL:
                        System.out.println("偏移量非法");
                        break;
                    default:
                        break;
                }
            }
        }
        consumer.shutdown();
    }
}

在这段代码中,首先创建 DefaultMQPullConsumer 实例并设置相关参数。通过 fetchSubscribeMessageQueues 获取订阅主题的消息队列,然后针对每个消息队列,先获取消费偏移量,再通过循环调用 pullBlockIfNotFound 方法拉取消息。根据拉取结果进行不同的处理,同时更新偏移量。

消息属性

自定义属性设置与获取

RocketMQ 允许为消息设置自定义属性,这些属性可以在消息的生产和消费过程中起到辅助作用。比如,可以通过设置属性来标记消息的优先级、业务类型等。 在生产者端设置属性的代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ProducerWithProperties {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_with_properties_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("properties_topic",
                "properties_tag",
                ("Hello RocketMQ with properties").getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 设置自定义属性
        msg.putUserProperty("priority", "high");
        msg.putUserProperty("bizType", "order");

        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在消费者端获取属性的代码示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithProperties {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_properties_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("properties_topic", "properties_tag");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                    // 获取自定义属性
                    String priority = msg.getUserProperty("priority");
                    String bizType = msg.getUserProperty("bizType");
                    System.out.printf("Priority: %s, BizType: %s%n", priority, bizType);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在生产者端通过 msg.putUserProperty 方法设置属性,在消费者端通过 msg.getUserProperty 方法获取属性。

系统属性介绍

除了自定义属性,RocketMQ 消息还有一些系统属性。比如 MSG_KEY,可以为消息设置业务键,方便在消息查询等场景中使用。在生产者端设置 MSG_KEY 的示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ProducerWithSystemProperty {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_with_system_property_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg = new Message("system_property_topic",
                "system_property_tag",
                ("Hello RocketMQ with system property").getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 设置 MSG_KEY
        msg.setKeys("order_123");

        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在消费者端获取 MSG_KEY 的示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithSystemProperty {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_system_property_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("system_property_topic", "system_property_tag");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                    // 获取 MSG_KEY
                    String keys = msg.getKeys();
                    System.out.printf("Keys: %s%n", keys);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

通过合理使用系统属性和自定义属性,可以增强消息在业务场景中的处理能力和可追溯性。

事务消息

事务消息原理

事务消息是 RocketMQ 提供的一种特殊消息类型,用于确保消息发送与本地事务的原子性。其原理是将消息发送过程分为两个阶段:Prepared 阶段和 Commit/Rollback 阶段。在 Prepared 阶段,生产者向 RocketMQ 服务端发送一条半消息(Half Message),服务端接收后返回确认。然后生产者执行本地事务,根据事务执行结果向服务端发送 Commit 或 Rollback 指令。如果服务端长时间未收到 Commit 或 Rollback 指令,会主动回调生产者的 checkLocalTransaction 方法来检查本地事务状态。

事务消息代码实现

以下是事务消息生产者的代码示例:

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地事务
                System.out.println("执行本地事务");
                // 这里模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 检查本地事务状态
                System.out.println("检查本地事务状态");
                // 这里模拟本地事务成功
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        Message msg = new Message("transaction_topic",
                "transaction_tag",
                ("Hello RocketMQ Transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在上述代码中,创建 TransactionMQProducer 实例,并设置 TransactionListener。在 executeLocalTransaction 方法中执行本地事务逻辑,返回 LocalTransactionState.COMMIT_MESSAGE 表示本地事务成功提交,LocalTransactionState.ROLLBACK_MESSAGE 表示回滚,LocalTransactionState.UNKNOW 表示状态未知。checkLocalTransaction 方法用于服务端回调检查本地事务状态。

事务消息消费者的代码与普通消费者类似,这里不再赘述。事务消息在一些涉及到分布式事务的场景中非常有用,比如电商系统中订单创建与库存扣减的原子性操作。

消息顺序性

顺序消息原理

RocketMQ 支持顺序消息,分为全局顺序和分区顺序。全局顺序是指在整个 Topic 范围内,消息严格按照发送顺序进行消费。分区顺序是指在一个消息队列内,消息按照发送顺序消费。实现顺序消息的关键在于将相关消息发送到同一个消息队列,消费者从该队列顺序消费。

顺序消息代码实现

顺序消息生产者代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[]{"ordered_tag1", "ordered_tag2", "ordered_tag3"};
        for (int i = 0; i < 10; i++) {
            int orderId = i % 3;
            Message msg = new Message("ordered_topic",
                    tags[orderId],
                    ("Hello RocketMQ Ordered " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

在上述代码中,通过 MessageQueueSelector 实现将具有相同 orderId 的消息发送到同一个消息队列。

顺序消息消费者代码示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("ordered_topic", "*");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在消费者端,注册 MessageListenerOrderly 监听器来顺序消费消息。通过这种方式,可以保证在同一个消息队列内消息的顺序性。

消息过滤

基于 Tag 过滤

RocketMQ 支持基于 Tag 进行消息过滤,这是一种简单有效的过滤方式。在生产者发送消息时指定 Tag,消费者订阅时也指定相应的 Tag,只有 Tag 匹配的消息才会被消费者接收。 生产者代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ProducerWithTagFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_with_tag_filter_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg1 = new Message("filter_topic",
                "tag1",
                ("Hello RocketMQ with tag1").getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message msg2 = new Message("filter_topic",
                "tag2",
                ("Hello RocketMQ with tag2").getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult1 = producer.send(msg1);
        SendResult sendResult2 = producer.send(msg2);

        System.out.printf("%s%n", sendResult1);
        System.out.printf("%s%n", sendResult2);

        producer.shutdown();
    }
}

消费者代码示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithTagFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_tag_filter_group");
        consumer.setNamesrvAddr("localhost:9876");
        // 只订阅 tag1 的消息
        consumer.subscribe("filter_topic", "tag1");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在上述代码中,消费者通过 subscribe 方法指定只接收 filter_topic 主题下 tag1 的消息。

基于 SQL92 表达式过滤

RocketMQ 还支持基于 SQL92 表达式进行消息过滤,这种方式更加灵活,可以根据消息的属性进行复杂的过滤。不过需要注意的是,使用 SQL92 过滤需要在 Broker 配置文件中开启相关功能。 生产者代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ProducerWithSqlFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_with_sql_filter_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message msg1 = new Message("sql_filter_topic",
                "sql_tag",
                ("Hello RocketMQ with sql filter 1").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg1.putUserProperty("age", "20");

        Message msg2 = new Message("sql_filter_topic",
                "sql_tag",
                ("Hello RocketMQ with sql filter 2").getBytes(RemotingHelper.DEFAULT_CHARSET));
        msg2.putUserProperty("age", "30");

        SendResult sendResult1 = producer.send(msg1);
        SendResult sendResult2 = producer.send(msg2);

        System.out.printf("%s%n", sendResult1);
        System.out.printf("%s%n", sendResult2);

        producer.shutdown();
    }
}

消费者代码示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithSqlFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_with_sql_filter_group");
        consumer.setNamesrvAddr("localhost:9876");
        // 使用 SQL92 表达式过滤 age > 25 的消息
        consumer.subscribe("sql_filter_topic", MessageSelector.bySql("age > 25"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s%n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

在上述代码中,生产者为消息设置 age 属性,消费者通过 MessageSelector.bySql 方法使用 SQL92 表达式过滤符合条件的消息。通过这种方式,可以实现更加精细化的消息过滤。

通过以上对 RocketMQ 客户端 API 各个方面的详细介绍和代码示例,开发者可以深入理解并灵活运用 RocketMQ 进行后端开发中的消息队列相关功能实现。无论是简单的消息发送与接收,还是复杂的事务消息、顺序消息、消息过滤等场景,RocketMQ 都提供了强大且易用的 API 支持。