RocketMQ延迟消息与定时任务实现
2022-06-012.9k 阅读
RocketMQ 延迟消息概述
RocketMQ 是一款高性能、高可靠的分布式消息队列。在许多业务场景中,我们需要消息不是立即被消费,而是在指定的一段时间后才被消费,这就涉及到延迟消息的应用。
RocketMQ 支持延迟消息的特性,允许生产者发送延迟消息,消费者在消息延迟时间到达后才能消费该消息。它的延迟级别是预先定义好的,默认情况下,RocketMQ 提供了 18 个延迟级别,分别为 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
延迟消息应用场景
- 订单超时处理:在电商系统中,用户下单后,如果在一定时间内未完成支付,系统需要自动取消订单并释放库存。例如,用户下单后 30 分钟内未支付,此时可以发送一条延迟 30 分钟的消息,消息到达延迟时间后,消费者接收消息并执行取消订单和释放库存的操作。
- 消息重试:当消息消费失败时,有时不希望立即重试,而是延迟一段时间后重试。比如网络波动导致消息消费失败,延迟几分钟后重试,可能网络就恢复正常了,这样可以避免不必要的重复尝试。
- 定时提醒:像会议提醒、生日提醒等场景,提前设置好提醒时间,发送延迟消息,到时间后消费者触发提醒逻辑。
RocketMQ 延迟消息原理
- 消息发送:生产者在发送消息时,通过设置消息的
delayTimeLevel
属性来指定延迟级别。例如,如果想延迟 5 分钟,就设置对应的延迟级别。 - Broker 处理:Broker 收到消息后,并不会将其直接放入可消费队列。而是根据延迟级别,将消息暂存到一个延迟队列中。这个延迟队列是基于时间轮算法实现的。时间轮算法可以高效地管理定时任务,它将时间划分为一个个时间格,每个时间格代表一定的时间跨度。消息会被分配到对应的时间格中,当时间轮转动到该时间格时,消息就会被投递到可消费队列中。
- 消息消费:消费者从可消费队列中获取消息并进行消费,此时消息已经到达延迟时间。
代码示例 - 发送延迟消息
以下是使用 Java 语言,基于 RocketMQ 客户端发送延迟消息的示例代码:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
// 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息实例,指定主题、标签和消息体
Message message = new Message("DelayTopic", "TagA", ("Hello RocketMQ " + i).getBytes("UTF-8"));
// 设置延迟级别,这里设置为延迟 5 分钟(对应延迟级别 9)
message.setDelayTimeLevel(9);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
在上述代码中,我们创建了一个 DefaultMQProducer
实例,并设置了生产者组和 NameServer 地址。然后,循环发送 10 条消息,每条消息都设置了延迟级别为 9(对应 5 分钟延迟)。
代码示例 - 消费延迟消息
消费延迟消息的代码如下:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DelayMessageConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayConsumerGroup");
// 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅主题和标签
consumer.subscribe("DelayTopic", "TagA");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
在这段代码中,我们创建了一个 DefaultMQPushConsumer
实例,设置了消费者组和 NameServer 地址,并订阅了 DelayTopic
主题和 TagA
标签。然后注册了一个消息监听器,在监听器中处理接收到的消息。
自定义延迟级别
虽然 RocketMQ 默认提供了 18 个延迟级别,但在某些特殊业务场景下,我们可能需要自定义延迟级别。
- 修改 Broker 配置:在
broker.conf
文件中,添加或修改messageDelayLevel
参数。例如,如果我们想添加 5 分钟 30 秒、10 分钟 30 秒的延迟级别,可以这样配置:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 5m30s 6m 7m 8m 9m 10m 10m30s 20m 30m 1h 2h
- 重启 Broker:修改配置后,需要重启 RocketMQ Broker 使配置生效。
- 代码使用:在生产者发送消息时,就可以使用新添加的延迟级别。例如,如果新添加的 5 分钟 30 秒延迟级别对应的索引是 10,那么可以这样设置:
message.setDelayTimeLevel(10);
定时任务实现
- 基于延迟消息模拟定时任务:通过发送不同延迟时间的消息,可以模拟定时任务。例如,我们可以发送一个每天定时执行任务的延迟消息。假设每天早上 8 点执行任务,我们可以计算当前时间到第二天早上 8 点的时间差,然后发送一条延迟相应时间的消息。当消息到达延迟时间后,消费者执行任务,并再次计算下一天相同时间的延迟时间,发送新的延迟消息,以此类推实现循环定时任务。
- 代码示例 - 基于延迟消息的定时任务
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class ScheduleTaskProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 计算到明天早上 8 点的延迟时间
long delayTime = calculateDelayTimeToTomorrow8AM();
Message message = new Message("ScheduleTopic", "TagA", ("定时任务消息").getBytes("UTF-8"));
// 设置延迟时间(这里假设可以设置具体延迟时间,实际需根据自定义延迟级别或其他方式实现)
message.setDelayTimeLevel(calculateDelayLevel(delayTime));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
private static long calculateDelayTimeToTomorrow8AM() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
Date now = new Date();
String tomorrow8AMStr = sdf.format(new Date(now.getTime() + 24 * 60 * 60 * 1000));
tomorrow8AMStr = tomorrow8AMStr.substring(0, 11) + "08:00:00";
Date tomorrow8AM = sdf.parse(tomorrow8AMStr);
return tomorrow8AM.getTime() - now.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
private static int calculateDelayLevel(long delayTime) {
// 根据自定义延迟级别计算逻辑返回对应的延迟级别索引
// 这里简单示例,实际需根据具体配置实现
if (delayTime >= 1 * 1000 && delayTime < 5 * 1000) {
return 1;
} else if (delayTime >= 5 * 1000 && delayTime < 10 * 1000) {
return 2;
}
// 其他情况...
return 0;
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduleTaskConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduleConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("ScheduleTopic", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("执行定时任务:" + new String(msg.getBody()));
// 再次计算并发送下一次定时任务的延迟消息
// 这里省略具体发送代码,可参考上面的生产者代码
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("定时任务消费者已启动");
}
}
在上述代码中,生产者计算到明天早上 8 点的延迟时间,并发送延迟消息。消费者在接收到消息后,执行定时任务,并再次计算下一次定时任务的延迟时间并发送消息。
注意事项
- 延迟级别限制:虽然可以自定义延迟级别,但过多的延迟级别可能会增加 Broker 的管理负担。应根据实际业务需求合理设置。
- 时间准确性:由于 RocketMQ 的延迟消息是基于时间轮算法,存在一定的时间误差。在对时间精度要求极高的场景下,需要谨慎使用。
- 消息堆积:如果大量发送延迟消息,可能会导致 Broker 的延迟队列堆积,影响性能。需要根据 Broker 的性能和业务量合理控制消息发送频率。
通过以上对 RocketMQ 延迟消息和定时任务实现的介绍,希望能帮助开发者更好地利用 RocketMQ 的这一特性,解决实际业务中的相关问题。在实际应用中,要充分考虑各种因素,确保系统的稳定和高效运行。