RocketMQ深入:消费者(Consumer)的消费模式与策略
2023-08-222.6k 阅读
RocketMQ消费者概述
RocketMQ的消费者(Consumer)是整个消息队列系统中负责接收并处理消息的重要组件。消费者从RocketMQ的消息队列(Message Queue)中拉取消息,并按照预设的逻辑进行消费处理。在实际应用场景中,消费者的行为和策略对于保证消息的可靠处理、系统的高可用性以及性能优化都起着关键作用。
消费模式
集群消费(Cluster Consumption)
- 原理:在集群消费模式下,多个消费者实例共同组成一个消费组(Consumer Group)。一个消费组内的消费者会分摊消费消息的任务。RocketMQ会将每个Topic下的消息队列平均分配给消费组内的各个消费者实例。例如,假设有一个Topic包含4个消息队列,消费组内有2个消费者实例,那么每个消费者实例会负责消费其中2个消息队列中的消息。
- 特点:
- 负载均衡:消费组内的消费者实例能够自动进行负载均衡,有效地提高了消息处理的并发能力。当某个消费者实例出现故障时,RocketMQ会自动将其负责的消息队列重新分配给其他正常的消费者实例,保证消息不会丢失且能够继续被消费。
- 重复消费:由于消息队列的动态分配,可能会出现消息被重复消费的情况。比如在消费者实例故障转移过程中,新接手消息队列的消费者可能会消费到之前已经被故障消费者处理过一部分的消息。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class ClusterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Cluster_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 设置消费模式为集群消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 订阅Topic
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Message: " + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,RocketMQ会重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Cluster Consumer Started.");
}
}
广播消费(Broadcast Consumption)
- 原理:在广播消费模式下,一个消费组内的所有消费者实例都会收到Topic下的每一条消息。即每个消费者实例都会独立地消费Topic中的全部消息,而不像集群消费那样进行消息队列的分摊。
- 特点:
- 消息全量接收:适用于需要所有消费者都处理同一条消息的场景,如配置更新消息,所有相关服务都需要获取到最新的配置信息。
- 无负载均衡:由于每个消费者都要接收并处理全部消息,不存在负载均衡机制,可能会对单个消费者的性能造成较大压力。同时,如果消费者实例数量较多,可能会导致消息处理的资源浪费。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Broadcast_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 设置消费模式为广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅Topic
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Message: " + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,RocketMQ会重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Broadcast Consumer Started.");
}
}
消费策略
顺序消费
- 原理:顺序消费保证消息的消费顺序与消息的发送顺序一致。在RocketMQ中,顺序消费是基于消息队列(Message Queue)实现的。同一个消息队列中的消息会按照先进先出(FIFO)的顺序被消费者消费。如果要实现全局顺序消费,需要保证所有消息都发送到同一个消息队列中,但这会严重影响系统的并发性能。因此,通常会采用局部顺序消费,即根据业务场景将相关的消息发送到同一个消息队列中,保证这部分消息的顺序性。
- 应用场景:
- 订单处理:在电商系统中,订单的创建、支付、发货等流程需要按照顺序处理,否则可能会出现支付完成但订单未创建,或者货物已发出但未收到款项的情况。
- 数据库事务日志:数据库的事务日志记录需要按照发生的顺序进行处理,以保证数据的一致性和完整性。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class OrderlyConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Orderly_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("OrderTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Orderly Message: " + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回SUSPEND_CURRENT_QUEUE_A_MOMENT,RocketMQ会暂停当前队列消费一段时间后重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
// 消费成功,返回SUCCESS
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Orderly Consumer Started.");
}
}
并发消费
- 原理:并发消费允许消费者同时处理多条消息,以提高消息的处理效率。在集群消费模式下,RocketMQ默认采用并发消费方式。消费者从分配给自己的消息队列中拉取消息后,会开启多个线程同时处理这些消息。
- 应用场景:
- 日志处理:大量日志消息的收集和分析场景中,这些日志消息之间通常没有顺序依赖关系,可以通过并发消费快速处理。
- 数据分析:在大数据分析场景中,对海量数据的统计和分析任务可以通过并发消费消息来加速处理过程。
- 并发消费的调优:
- 线程池配置:可以通过调整消费者的线程池参数来优化并发消费性能。例如,
DefaultMQPushConsumer
类中的setConsumeThreadMin
和setConsumeThreadMax
方法可以设置消费线程池的最小和最大线程数。合理设置这些参数可以根据系统的硬件资源和消息处理负载来平衡并发处理能力和资源消耗。 - 批量消费:RocketMQ支持批量消费,即一次拉取多条消息进行处理。可以通过
DefaultMQPushConsumer
的setConsumeMessageBatchMaxSize
方法设置每次批量消费的最大消息数量。批量消费可以减少网络开销,提高消费效率,但需要注意消息处理逻辑是否能够适应批量处理的情况。
- 线程池配置:可以通过调整消费者的线程池参数来优化并发消费性能。例如,
消息过滤策略
标签过滤
- 原理:在RocketMQ中,消息可以带有标签(Tag)。标签是对消息的一种分类标识,消费者可以通过订阅指定的标签来过滤消息。例如,一个电商系统的消息队列中,可能会有订单相关消息,这些消息可以根据不同的订单类型(如普通订单、团购订单、秒杀订单等)设置不同的标签。消费者如果只关心普通订单消息,就可以订阅普通订单标签的消息。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class TagFilterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilter_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic及标签,这里订阅普通订单标签的消息
consumer.subscribe("OrderTopic", "NormalOrderTag");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Message with Tag: " + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,RocketMQ会重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Tag Filter Consumer Started.");
}
}
SQL92表达式过滤
- 原理:RocketMQ支持使用SQL92表达式进行消息过滤,这为消息过滤提供了更强大和灵活的方式。消息在发送时,可以在消息属性(Properties)中设置一些自定义的键值对。消费者在订阅消息时,可以使用SQL92表达式对这些消息属性进行过滤。例如,消息属性中设置了订单金额(orderAmount)和订单状态(orderStatus)等信息,消费者可以通过SQL表达式
orderAmount > 100 AND orderStatus = 'PAID'
来过滤出金额大于100且已支付的订单消息。 - 使用限制:
- 性能影响:SQL过滤会增加Broker的处理负担,因为Broker需要对每一条消息进行SQL表达式的计算来判断是否符合过滤条件。因此,在使用SQL过滤时,需要谨慎评估对系统性能的影响。
- 数据类型支持:目前RocketMQ支持的SQL表达式数据类型有限,主要包括
int
、long
、float
、double
、boolean
和string
。并且,string
类型只支持简单的相等比较,不支持复杂的字符串操作。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilter_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic及SQL表达式过滤条件
consumer.subscribe("OrderTopic", "orderAmount > 100 AND orderStatus = 'PAID'");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Message with SQL Filter: " + messageBody);
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,RocketMQ会重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Sql Filter Consumer Started.");
}
}
消费重试机制
- 自动重试:当消费者消费消息失败时,RocketMQ会自动进行重试。默认情况下,集群消费模式下,消费失败的消息会被重新放回消息队列的队尾,等待下次被消费。重试的次数和间隔时间可以通过配置进行调整。例如,
DefaultMQPushConsumer
类中的setMaxReconsumeTimes
方法可以设置最大重试次数,默认值为16次。每次重试的间隔时间会逐渐增加,从10秒开始,每次翻倍,直到最大间隔时间为1280秒。 - 死信队列:如果消息经过多次重试后仍然消费失败,RocketMQ会将该消息发送到死信队列(Dead Letter Queue)。死信队列是一个特殊的队列,用于存放无法正常消费的消息。可以通过管理工具或代码对死信队列中的消息进行分析和处理,找出消费失败的原因并进行修复。
- 代码示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class RetryConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,指定消费组名称
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Retry_Consumer_Group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 设置最大重试次数
consumer.setMaxReconsumeTimes(3);
// 订阅Topic
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consume Message: " + messageBody);
// 模拟消费失败
throw new RuntimeException("Consume failed");
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,RocketMQ会重试消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Retry Consumer Started.");
}
}
消费进度管理
- 本地存储:RocketMQ的消费者会将消费进度(Consume Offset)本地存储在消费者实例所在的机器上。默认情况下,消费进度存储在
$HOME/store/commitlog/consumeOffset.json
文件中。当消费者启动时,会从本地文件中读取消费进度,继续从上次消费的位置开始消费消息。这种本地存储方式可以提高消费进度的读取和更新效率,减少与Broker的交互。 - Broker同步:消费者会定期将本地的消费进度同步到Broker。这样做的目的是为了在消费者实例故障或重启时,能够从Broker获取最新的消费进度,保证消息消费的连续性。同步的频率可以通过
DefaultMQPushConsumer
类中的setPersistConsumerOffsetInterval
方法进行设置,默认值为5000毫秒,即每5秒同步一次消费进度到Broker。 - 消费进度的重置:在某些特殊情况下,可能需要重置消费进度,例如重新消费历史消息进行数据分析或问题排查。可以通过管理工具或代码来重置消费进度。在代码中,可以使用
DefaultMQPushConsumer
的seek
方法来设置指定消息队列的消费偏移量,从而实现消费进度的重置。
总结
RocketMQ消费者的消费模式和策略为后端开发人员提供了丰富的选择,以满足不同业务场景的需求。通过合理选择消费模式(集群消费或广播消费)、消费策略(顺序消费或并发消费)、消息过滤策略以及正确处理消费重试和消费进度管理,可以构建出高效、可靠的消息驱动的后端应用系统。在实际应用中,需要根据业务特点和系统性能要求,灵活调整和优化这些配置和策略,以充分发挥RocketMQ的优势。同时,深入理解这些机制的原理和实现,有助于在遇到问题时能够快速定位和解决,保障系统的稳定运行。