MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ消息过滤机制与标签路由

2022-01-243.4k 阅读

RocketMQ消息过滤机制概述

在分布式系统中,消息队列扮演着至关重要的角色,它能够解耦系统组件、削峰填谷以及异步处理消息。RocketMQ作为一款高性能、高可靠的分布式消息队列,其消息过滤机制为开发者提供了一种灵活处理消息的方式。消息过滤是指在消息消费端,根据一定的规则筛选出符合条件的消息进行处理,避免对无关消息进行无效处理,从而提高系统的效率和资源利用率。

RocketMQ支持两种消息过滤方式:基于标签(Tag)的过滤和基于SQL表达式的过滤。基于标签的过滤是一种简单且常用的方式,而基于SQL表达式的过滤则更为灵活,可以满足复杂的业务需求。

基于标签(Tag)的过滤

标签的定义与使用

在RocketMQ中,标签是消息的一个属性,用于对消息进行分类。生产者在发送消息时,可以为消息指定一个或多个标签。例如,我们有一个电商系统,其中有订单相关的消息,我们可以为不同类型的订单消息设置不同的标签,如“new_order”表示新订单,“paid_order”表示已支付订单等。

在Java代码中,发送带有标签的消息示例如下:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerWithTag {
    public static void main(String[] args) throws Exception {
        // 创建消息生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息,指定主题、标签和消息体
        Message message = new Message("TopicTest", "new_order", "新订单消息内容".getBytes("UTF-8"));
        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

在上述代码中,我们创建了一条主题为“TopicTest”,标签为“new_order”的消息。

消费者基于标签的过滤

消费者在订阅消息时,可以指定需要接收的标签。这样,只有标签匹配的消息才会被消费者接收并处理。例如:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithTag {
    public static void main(String[] args) throws Exception {
        // 创建消息消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("TopicTest", "new_order");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

在这段代码中,消费者订阅了主题“TopicTest”且标签为“new_order”的消息。只有满足此条件的消息才会被该消费者处理。

标签过滤的原理

RocketMQ的Broker在存储消息时,会将消息的主题和标签等信息进行索引。当消费者订阅主题和标签时,Broker根据消费者的订阅信息,从存储的消息中筛选出符合条件的消息推送给消费者。这种基于标签的过滤方式实现简单,性能较高,适用于大多数场景。但它的局限性在于,标签只能是简单的字符串,无法表达复杂的过滤逻辑。

基于SQL表达式的过滤

SQL表达式过滤的优势

基于SQL表达式的过滤弥补了标签过滤的不足,它允许开发者使用SQL语法来定义复杂的过滤规则。例如,我们可以根据消息的属性值进行比较、逻辑运算等操作。在电商系统中,我们可能需要根据订单金额、下单时间等属性来过滤订单消息,SQL表达式过滤就能很好地满足这类需求。

启用SQL表达式过滤

要使用SQL表达式过滤,首先需要在Broker配置文件中启用SQL过滤功能。在broker.conf文件中,添加如下配置:

enablePropertyFilter = true

重启Broker后,SQL表达式过滤功能就启用了。

生产者设置消息属性

在发送消息时,生产者需要为消息设置相关的属性,以便在SQL表达式中使用。例如,我们为订单消息设置订单金额和下单时间属性:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerWithSQLFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message message = new Message("TopicTest", "订单消息".getBytes("UTF-8"));
        // 设置订单金额属性
        message.putUserProperty("order_amount", "100");
        // 设置下单时间属性
        message.putUserProperty("order_time", "20231001120000");

        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在上述代码中,我们为消息设置了“order_amount”和“order_time”两个属性。

消费者使用SQL表达式订阅

消费者在订阅消息时,可以使用SQL表达式来定义过滤规则。例如,我们只接收订单金额大于50的消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerWithSQLFilter {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group4");
        consumer.setNamesrvAddr("localhost:9876");
        // 使用SQL表达式订阅消息
        consumer.subscribe("TopicTest", MessageSelector.bySql("order_amount > 50"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者已启动");
    }
}

在这段代码中,消费者使用MessageSelector.bySql("order_amount > 50")来订阅订单金额大于50的消息。

SQL表达式语法支持

RocketMQ支持的SQL表达式语法包括基本的比较运算符(如><=!=等)、逻辑运算符(如ANDORNOT)以及字符串匹配函数(如LIKE)等。例如,我们可以使用如下复杂的SQL表达式:

order_amount > 50 AND order_time LIKE '202310%'

此表达式表示筛选出订单金额大于50且下单时间在2023年10月的消息。

标签路由

标签路由的概念

标签路由是基于标签的一种消息路由策略。在RocketMQ中,生产者通过标签将消息发送到特定的队列,而消费者根据自身需求订阅相应标签的消息队列,从而实现消息的精准路由。标签路由可以提高消息处理的效率,使得不同类型的消息能够被不同的消费者或消费组高效处理。

生产者基于标签路由消息

生产者在发送消息时,可以根据标签将消息发送到不同的队列。例如,我们可以根据订单的紧急程度设置不同的标签,将紧急订单消息发送到特定的队列。以下是代码示例:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class ProducerWithTagRouting {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group5");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 获取主题的队列信息
        List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("TopicTest");

        // 发送紧急订单消息到特定队列
        Message urgentOrderMessage = new Message("TopicTest", "urgent_order", "紧急订单消息内容".getBytes("UTF-8"));
        int urgentQueueIndex = 0; // 假设将紧急订单发送到第一个队列
        producer.send(urgentOrderMessage, messageQueues.get(urgentQueueIndex));

        // 发送普通订单消息到其他队列
        Message normalOrderMessage = new Message("TopicTest", "normal_order", "普通订单消息内容".getBytes("UTF-8"));
        int normalQueueIndex = 1; // 假设将普通订单发送到第二个队列
        producer.send(normalOrderMessage, messageQueues.get(normalQueueIndex));

        producer.shutdown();
    }
}

在上述代码中,我们根据订单的紧急程度标签,将紧急订单消息和普通订单消息发送到不同的队列。

消费者基于标签路由消费

消费者在订阅消息时,可以根据标签选择消费特定队列的消息。例如,处理紧急订单的消费者可以订阅紧急订单标签对应的队列。代码示例如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.List;

public class ConsumerWithTagRouting {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group6");
        consumer.setNamesrvAddr("localhost:9876");

        // 只订阅紧急订单标签对应的队列
        List<MessageQueue> urgentMessageQueues = new ArrayList<>();
        // 假设紧急订单队列是第一个队列
        urgentMessageQueues.add(new MessageQueue("TopicTest", 0));
        consumer.assign(urgentMessageQueues);
        consumer.subscribe("TopicTest", "urgent_order");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("处理紧急订单消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("紧急订单消费者已启动");
    }
}

在这段代码中,消费者通过assign方法指定只消费紧急订单标签对应的队列中的消息。

标签路由的优势与应用场景

标签路由的优势在于能够实现消息的精细化管理和处理。在实际应用中,对于一些对消息处理有不同优先级或不同处理逻辑的场景,标签路由非常适用。例如,在电商系统中,对于高价值订单、促销订单等不同类型的订单消息,可以通过标签路由将它们发送到不同的队列,由不同的消费者或消费组进行处理,从而提高整个系统的处理效率和性能。

消息过滤与标签路由的结合使用

在实际的分布式系统中,往往需要将消息过滤和标签路由结合起来使用,以满足复杂的业务需求。例如,在一个大型的物流系统中,我们可能有多种类型的物流消息,如包裹揽收消息、运输中消息、派送消息等,同时这些消息可能还包含一些属性,如包裹重量、目的地等。

我们可以先通过标签将不同类型的物流消息进行初步分类,比如使用“pickup”标签表示包裹揽收消息,“in_transit”标签表示运输中消息等。然后,对于某些特定需求,再使用SQL表达式过滤进一步筛选消息。例如,我们可能只关心重量大于10kg且目的地为特定城市的运输中消息。

代码示例

生产者发送消息时,设置标签和属性:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class ProducerCombined {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group7");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        Message message = new Message("LogisticsTopic", "in_transit", "运输中消息内容".getBytes("UTF-8"));
        message.putUserProperty("package_weight", "15");
        message.putUserProperty("destination", "Shanghai");

        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

消费者结合标签和SQL表达式订阅消息:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerCombined {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group8");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("LogisticsTopic", MessageSelector.bySql("package_weight > 10 AND destination = 'Shanghai' AND TAGS = 'in_transit'"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到符合条件的消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者已启动");
    }
}

在上述代码中,生产者发送了带有“in_transit”标签和相关属性的消息,消费者通过结合标签和SQL表达式,只接收重量大于10kg且目的地为“Shanghai”的运输中消息。

通过这种方式,我们可以充分利用RocketMQ的消息过滤机制和标签路由功能,构建出更加灵活、高效的分布式消息处理系统,满足不同业务场景下对消息处理的复杂需求。

注意事项与性能优化

注意事项

  1. SQL表达式性能:虽然SQL表达式过滤提供了强大的功能,但由于其需要对消息的属性进行解析和计算,相比基于标签的过滤,性能会有所下降。因此,在使用SQL表达式时,应尽量避免复杂的计算和函数调用,以提高过滤效率。
  2. 属性命名规范:在设置消息属性时,应遵循一定的命名规范,避免使用与RocketMQ内部保留字冲突的名称。同时,属性名称应具有一定的可读性,便于理解和维护。
  3. 标签和SQL表达式的兼容性:在结合使用标签和SQL表达式时,要确保两者的逻辑是一致的,避免出现矛盾的过滤规则,导致消息无法正确接收。

性能优化

  1. 合理设置队列数量:根据系统的负载和消息处理能力,合理设置主题的队列数量。过少的队列可能导致消息处理速度慢,而过多的队列则会增加系统资源消耗。
  2. 批量操作:生产者可以采用批量发送消息的方式,减少网络开销,提高发送效率。消费者也可以采用批量消费的方式,一次性处理多条消息,提高消费效率。
  3. 异步处理:在消费者处理消息时,可以采用异步处理的方式,将消息处理逻辑放入线程池中执行,避免阻塞消息消费线程,提高系统的并发处理能力。

通过注意这些事项并进行性能优化,可以充分发挥RocketMQ消息过滤机制和标签路由的优势,构建出高性能、高可靠的分布式消息系统。