MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ延迟消息与定时任务实现

2022-06-012.9k 阅读

RocketMQ 延迟消息概述

RocketMQ 是一款高性能、高可靠的分布式消息队列。在许多业务场景中,我们需要消息不是立即被消费,而是在指定的一段时间后才被消费,这就涉及到延迟消息的应用。

RocketMQ 支持延迟消息的特性,允许生产者发送延迟消息,消费者在消息延迟时间到达后才能消费该消息。它的延迟级别是预先定义好的,默认情况下,RocketMQ 提供了 18 个延迟级别,分别为 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

延迟消息应用场景

  1. 订单超时处理:在电商系统中,用户下单后,如果在一定时间内未完成支付,系统需要自动取消订单并释放库存。例如,用户下单后 30 分钟内未支付,此时可以发送一条延迟 30 分钟的消息,消息到达延迟时间后,消费者接收消息并执行取消订单和释放库存的操作。
  2. 消息重试:当消息消费失败时,有时不希望立即重试,而是延迟一段时间后重试。比如网络波动导致消息消费失败,延迟几分钟后重试,可能网络就恢复正常了,这样可以避免不必要的重复尝试。
  3. 定时提醒:像会议提醒、生日提醒等场景,提前设置好提醒时间,发送延迟消息,到时间后消费者触发提醒逻辑。

RocketMQ 延迟消息原理

  1. 消息发送:生产者在发送消息时,通过设置消息的 delayTimeLevel 属性来指定延迟级别。例如,如果想延迟 5 分钟,就设置对应的延迟级别。
  2. Broker 处理:Broker 收到消息后,并不会将其直接放入可消费队列。而是根据延迟级别,将消息暂存到一个延迟队列中。这个延迟队列是基于时间轮算法实现的。时间轮算法可以高效地管理定时任务,它将时间划分为一个个时间格,每个时间格代表一定的时间跨度。消息会被分配到对应的时间格中,当时间轮转动到该时间格时,消息就会被投递到可消费队列中。
  3. 消息消费:消费者从可消费队列中获取消息并进行消费,此时消息已经到达延迟时间。

代码示例 - 发送延迟消息

以下是使用 Java 语言,基于 RocketMQ 客户端发送延迟消息的示例代码:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class DelayMessageProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例,指定主题、标签和消息体
            Message message = new Message("DelayTopic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
            // 设置延迟级别,这里设置为延迟 5 分钟(对应延迟级别 9)
            message.setDelayTimeLevel(9);
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

在上述代码中,我们创建了一个 DefaultMQProducer 实例,并设置了生产者组和 NameServer 地址。然后,循环发送 10 条消息,每条消息都设置了延迟级别为 9(对应 5 分钟延迟)。

代码示例 - 消费延迟消息

消费延迟消息的代码如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DelayMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayConsumerGroup");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅主题和标签
        consumer.subscribe("DelayTopic", "TagA");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

在这段代码中,我们创建了一个 DefaultMQPushConsumer 实例,设置了消费者组和 NameServer 地址,并订阅了 DelayTopic 主题和 TagA 标签。然后注册了一个消息监听器,在监听器中处理接收到的消息。

自定义延迟级别

虽然 RocketMQ 默认提供了 18 个延迟级别,但在某些特殊业务场景下,我们可能需要自定义延迟级别。

  1. 修改 Broker 配置:在 broker.conf 文件中,添加或修改 messageDelayLevel 参数。例如,如果我们想添加 5 分钟 30 秒、10 分钟 30 秒的延迟级别,可以这样配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 5m30s 6m 7m 8m 9m 10m 10m30s 20m 30m 1h 2h
  1. 重启 Broker:修改配置后,需要重启 RocketMQ Broker 使配置生效。
  2. 代码使用:在生产者发送消息时,就可以使用新添加的延迟级别。例如,如果新添加的 5 分钟 30 秒延迟级别对应的索引是 10,那么可以这样设置:
message.setDelayTimeLevel(10);

定时任务实现

  1. 基于延迟消息模拟定时任务:通过发送不同延迟时间的消息,可以模拟定时任务。例如,我们可以发送一个每天定时执行任务的延迟消息。假设每天早上 8 点执行任务,我们可以计算当前时间到第二天早上 8 点的时间差,然后发送一条延迟相应时间的消息。当消息到达延迟时间后,消费者执行任务,并再次计算下一天相同时间的延迟时间,发送新的延迟消息,以此类推实现循环定时任务。
  2. 代码示例 - 基于延迟消息的定时任务
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ScheduleTaskProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 计算到明天早上 8 点的延迟时间
        long delayTime = calculateDelayTimeToTomorrow8AM();
        Message message = new Message("ScheduleTopic", "TagA", ("定时任务消息").getBytes("UTF-8"));
        // 设置延迟时间(这里假设可以设置具体延迟时间,实际需根据自定义延迟级别或其他方式实现)
        message.setDelayTimeLevel(calculateDelayLevel(delayTime));
        SendResult sendResult = producer.send(message);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }

    private static long calculateDelayTimeToTomorrow8AM() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            Date now = new Date();
            String tomorrow8AMStr = sdf.format(new Date(now.getTime() + 24 * 60 * 60 * 1000));
            tomorrow8AMStr = tomorrow8AMStr.substring(0, 11) + "08:00:00";
            Date tomorrow8AM = sdf.parse(tomorrow8AMStr);
            return tomorrow8AM.getTime() - now.getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }
        return 0;
    }

    private static int calculateDelayLevel(long delayTime) {
        // 根据自定义延迟级别计算逻辑返回对应的延迟级别索引
        // 这里简单示例,实际需根据具体配置实现
        if (delayTime >= 1 * 1000 && delayTime < 5 * 1000) {
            return 1;
        } else if (delayTime >= 5 * 1000 && delayTime < 10 * 1000) {
            return 2;
        }
        // 其他情况...
        return 0;
    }
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ScheduleTaskConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("ScheduleTopic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("执行定时任务:" + new String(msg.getBody()));
                    // 再次计算并发送下一次定时任务的延迟消息
                    // 这里省略具体发送代码,可参考上面的生产者代码
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("定时任务消费者已启动");
    }
}

在上述代码中,生产者计算到明天早上 8 点的延迟时间,并发送延迟消息。消费者在接收到消息后,执行定时任务,并再次计算下一次定时任务的延迟时间并发送消息。

注意事项

  1. 延迟级别限制:虽然可以自定义延迟级别,但过多的延迟级别可能会增加 Broker 的管理负担。应根据实际业务需求合理设置。
  2. 时间准确性:由于 RocketMQ 的延迟消息是基于时间轮算法,存在一定的时间误差。在对时间精度要求极高的场景下,需要谨慎使用。
  3. 消息堆积:如果大量发送延迟消息,可能会导致 Broker 的延迟队列堆积,影响性能。需要根据 Broker 的性能和业务量合理控制消息发送频率。

通过以上对 RocketMQ 延迟消息和定时任务实现的介绍,希望能帮助开发者更好地利用 RocketMQ 的这一特性,解决实际业务中的相关问题。在实际应用中,要充分考虑各种因素,确保系统的稳定和高效运行。