MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ消费者(Consumer)API使用指南

2023-11-174.7k 阅读

1. RocketMQ消费者概述

在分布式系统中,消息队列扮演着至关重要的角色,RocketMQ作为一款高性能、高可靠的消息队列,为开发者提供了丰富的功能。消费者(Consumer)是RocketMQ消息处理流程中的关键一环,负责从消息队列中获取并处理消息。

RocketMQ消费者具有多种消费模式,包括集群消费(Cluster)和广播消费(Broadcast)。集群消费模式下,同一消费组内的消费者会平均分摊消息,适合高吞吐量的场景;广播消费模式则是每个消费者都会收到所有消息,适用于需要每个节点都处理全量消息的场景。

2. 引入依赖

要使用RocketMQ消费者API,首先需要在项目中引入相应的依赖。如果使用Maven项目管理工具,可以在pom.xml文件中添加如下依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

这里以4.9.4版本为例,实际使用中可以根据项目需求选择合适的版本。

3. 简单消费者示例

3.1 集群消费示例

下面是一个简单的集群消费示例代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class ClusterConsumerExample {
    public static void main(String[] args) throws Exception {
        // 创建DefaultMQPushConsumer实例,指定消费组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ClusterConsumerGroup");
        // 设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置消费模式为集群消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 订阅主题和标签,这里订阅TopicTest主题下所有标签
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                // 返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

在上述代码中:

  • 首先创建了一个DefaultMQPushConsumer实例,并指定了消费组名ClusterConsumerGroup。消费组是具有相同消费逻辑的消费者的集合,同一消费组内的消费者会共同消费消息。
  • 通过setNamesrvAddr方法设置了NameServer的地址,NameServer是RocketMQ的名称服务,用于保存Topic和Broker的映射关系。
  • 使用setMessageModel方法将消费模式设置为集群消费CLUSTERING
  • 通过subscribe方法订阅了TopicTest主题下的所有消息(*表示所有标签)。
  • 注册了一个MessageListenerConcurrently消息监听器,在consumeMessage方法中处理接收到的消息,这里简单地将消息内容打印出来,并返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功。
  • 最后调用start方法启动消费者。

3.2 广播消费示例

广播消费示例代码如下:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class BroadcastConsumerExample {
    public static void main(String[] args) throws Exception {
        // 创建DefaultMQPushConsumer实例,指定消费组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadcastConsumerGroup");
        // 设置NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置消费模式为广播消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 订阅主题和标签,这里订阅TopicTest主题下所有标签
        consumer.subscribe("TopicTest", "*");

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息:" + new String(msg.getBody()));
                }
                // 返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

与集群消费示例相比,广播消费示例主要区别在于通过setMessageModel方法将消费模式设置为BROADCASTING,这样每个消费者都会收到主题下的所有消息。

4. 消费者配置详解

4.1 消费模式配置

如前面示例所示,通过setMessageModel方法可以设置消费模式。

  • 集群消费(CLUSTERING): 在集群消费模式下,RocketMQ会根据消费者的数量对消息队列进行负载均衡。假设一个主题有4个队列,集群中有2个消费者实例,那么每个消费者实例会负责消费2个队列中的消息。这种模式适合大多数业务场景,特别是需要高吞吐量处理消息的场景,因为多个消费者可以并行处理消息,提高整体的处理效率。
  • 广播消费(BROADCASTING): 广播消费模式下,每个消费者都会收到主题下的所有消息。这在一些特殊场景下非常有用,比如需要每个节点都对消息进行处理,例如系统配置更新的消息,每个节点都需要知道并更新自己的配置。但由于每个消费者都要处理全量消息,可能会对系统资源造成较大压力,不适合高吞吐量的场景。

4.2 订阅关系配置

通过subscribe方法可以订阅主题和标签。

  • 订阅主题和所有标签consumer.subscribe("TopicTest", "*") 这种方式表示订阅TopicTest主题下的所有消息,无论消息的标签是什么。在实际应用中,如果不需要根据标签来过滤消息,这种方式可以简单方便地获取主题下的所有消息。
  • 订阅主题和特定标签consumer.subscribe("TopicTest", "TagA || TagB") 此方式表示只订阅TopicTest主题下标签为TagATagB的消息。标签是RocketMQ提供的一种简单的消息过滤方式,通过设置和订阅标签,可以让消费者只接收感兴趣的消息,减少不必要的消息处理。

4.3 并发消费相关配置

RocketMQ消费者支持并发消费,通过一些配置参数可以控制并发消费的行为。

  • 设置最大并发消费线程数consumer.setConsumeThreadMax(20) 该方法设置了消费者实例在并发消费时最大的线程数。默认值为20,如果业务处理消息的逻辑比较复杂,需要更多的线程来提高并发处理能力,可以适当增大这个值。但也要注意系统资源的限制,过多的线程可能会导致系统资源耗尽。
  • 设置最小并发消费线程数consumer.setConsumeThreadMin(10) 此方法设置了消费者实例在并发消费时最小的线程数。默认值为10,当系统负载较低时,消费者会保持最小线程数来处理消息,以节省系统资源。

4.4 消息重试相关配置

当消息消费失败时,RocketMQ支持自动重试机制。

  • 设置最大重试次数consumer.setMaxReconsumeTimes(3) 该方法设置了消息最大的重试次数,默认值为16次。当消息消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER状态时,RocketMQ会在一定时间间隔后重新投递该消息给消费者进行消费,直到达到最大重试次数。
  • 重试间隔时间: RocketMQ的重试间隔时间是递增的,第一次重试间隔10秒,第二次20秒,第三次30秒,以此类推,每次递增10秒,直到16次重试后间隔时间为160秒。如果需要自定义重试间隔时间,可以通过修改MQClientInstanceretryTopicQueueNums等相关配置来实现,但这需要对RocketMQ内部机制有较深入的了解。

5. 消息监听器

5.1 并发消息监听器(MessageListenerConcurrently)

在前面的示例中,我们已经使用过MessageListenerConcurrently。它会将一批消息(默认一次最多拉取32条)同时传递给consumeMessage方法进行处理。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息逻辑
        }
        // 返回消费状态
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

在这种模式下,消息处理逻辑是并发执行的,适合处理相互之间没有依赖关系的消息。如果这批消息中有部分消息处理失败,整个处理过程可以选择返回ConsumeConcurrentlyStatus.RECONSUME_LATER,让RocketMQ重新投递这批消息。

5.2 顺序消息监听器(MessageListenerOrderly)

顺序消息是指消息的消费顺序和发送顺序一致。在一些业务场景中,比如订单处理,订单的创建、支付、发货等操作需要按照顺序进行,这就需要使用顺序消息监听器。

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息逻辑
        }
        // 返回消费状态
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

MessageListenerOrderly会按照消息的顺序依次处理消息,同一队列中的消息会被一个线程顺序消费。如果消息处理失败,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,暂停当前队列的消费,稍后RocketMQ会重新投递消息。

6. 消费进度管理

6.1 自动提交消费进度

RocketMQ默认采用自动提交消费进度的方式。在集群消费模式下,消费者会定期将自己的消费进度提交到Broker。当消费者重启或者出现故障恢复后,RocketMQ会根据提交的消费进度继续从上次消费的位置开始消费消息。这种方式简单方便,适用于大多数场景。

6.2 手动提交消费进度

在某些情况下,可能需要手动控制消费进度的提交。例如,在处理一批消息时,需要确保所有消息都成功处理后才提交消费进度,以避免部分消息处理成功但进度提交后导致其他消费者重复消费的问题。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息逻辑
        }
        // 手动提交消费进度
        context.setAckIndex(msgs.size());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

在上述代码中,通过context.setAckIndex(msgs.size())手动设置了消费进度,表示这批消息全部处理完成,可以提交消费进度。

7. 异常处理

7.1 消费异常处理

在消息消费过程中,可能会出现各种异常情况。例如,业务逻辑处理失败、网络异常等。当出现异常时,根据不同的消费模式和监听器类型,有不同的处理方式。

  • 并发消费异常处理: 如果在MessageListenerConcurrently中处理消息出现异常,可以返回ConsumeConcurrentlyStatus.RECONSUME_LATER,让RocketMQ重新投递消息。同时,可以记录异常日志,方便后续排查问题。
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                // 处理消息逻辑
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 记录异常日志
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});
  • 顺序消费异常处理: 在MessageListenerOrderly中,如果处理消息出现异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,暂停当前队列的消费,等待RocketMQ重新投递消息。同样,也需要记录异常日志。
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                // 处理消息逻辑
            }
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            // 记录异常日志
            e.printStackTrace();
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
});

7.2 连接异常处理

在消费者与NameServer或Broker建立连接过程中,可能会出现连接超时、网络中断等异常情况。RocketMQ客户端提供了一定的重试机制来处理这些异常。例如,当与NameServer连接失败时,客户端会按照一定的时间间隔进行重试,直到连接成功或者达到最大重试次数。可以通过配置DefaultMQPushConsumerretryTimesWhenSendFailed等参数来调整重试策略。

8. 高级特性

8.1 批量消费

RocketMQ支持批量消费,通过批量拉取和处理消息,可以减少网络开销,提高消费效率。

// 设置每次拉取的最大消息数量
consumer.setPullBatchSize(32);

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 处理消息逻辑
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

在上述代码中,通过setPullBatchSize方法设置了每次拉取的最大消息数量为32条。批量消费时需要注意消息的总大小不能超过Broker配置的限制,默认单个消息最大为4MB,批量消息大小也会有相应限制。

8.2 消息过滤

除了通过标签进行简单的消息过滤外,RocketMQ还支持SQL92表达式的消息过滤。要使用SQL92过滤,需要在Broker配置文件中开启相关功能。

consumer.subscribe("TopicTest", MessageSelector.bySql("age > 18 AND gender = 'male'"));

上述代码表示订阅TopicTest主题下,消息属性中age大于18且gendermale的消息。在发送消息时,需要设置相应的消息属性。

Message msg = new Message("TopicTest", "TagA", "key1", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", "20");
msg.putUserProperty("gender", "male");
producer.send(msg);

8.3 事务消息消费

事务消息在一些分布式事务场景中非常有用。RocketMQ的事务消息消费与普通消息消费类似,但在处理事务消息时,需要注意消息的二次确认。当消费者接收到事务消息时,首先会进行预消费,此时消息处于半消息状态。只有当事务提交后,消息才会真正被消费。如果事务回滚,消息会被删除。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 判断是否是事务消息
            if (msg.getTransactionId() != null) {
                // 处理事务消息逻辑
            } else {
                // 处理普通消息逻辑
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

9. 性能优化

9.1 合理设置并发线程数

根据业务场景和系统资源情况,合理设置消费者的并发线程数。如果业务处理消息的逻辑比较简单,可以适当增大并发线程数,提高消息处理的吞吐量。但如果业务逻辑复杂,过多的并发线程可能会导致系统资源竞争,反而降低处理效率。可以通过性能测试来确定最优的并发线程数。

9.2 批量处理和异步处理

采用批量消费和异步处理的方式可以提高性能。批量消费减少了网络请求次数,而异步处理可以让消息处理逻辑在后台线程中执行,不阻塞主线程,提高系统的响应速度。例如,可以使用Java的CompletableFuture来实现异步处理消息。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        CompletableFuture.runAsync(() -> {
            for (MessageExt msg : msgs) {
                // 处理消息逻辑
            }
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

9.3 优化网络配置

合理配置网络参数,如TCP连接超时时间、缓冲区大小等,可以提高网络传输效率。在RocketMQ客户端中,可以通过DefaultMQPushConsumer的相关配置参数来调整网络相关的设置,例如setClientSocketSndBufSize设置发送缓冲区大小,setClientSocketRcvBufSize设置接收缓冲区大小等。

10. 常见问题及解决方法

10.1 消息丢失问题

  • 原因分析: 消息丢失可能发生在多个环节。例如,生产者发送消息时网络异常导致消息未成功到达Broker;Broker在存储消息时出现故障,导致消息丢失;消费者消费消息时,处理过程中出现异常但未正确处理,导致消息被认为消费成功而实际上未处理。
  • 解决方法: 对于生产者,可以使用同步发送方式,并设置合适的超时时间,确保消息成功发送到Broker。在Broker端,可以配置多副本机制,提高消息存储的可靠性。对于消费者,要正确处理消费异常,确保消息不会因为异常而丢失。同时,可以开启消息持久化,即使Broker重启,消息也不会丢失。

10.2 消息重复消费问题

  • 原因分析: 在网络波动、消费者处理超时等情况下,可能会导致消息重复消费。例如,消费者处理消息时返回消费成功状态,但由于网络问题,Broker未收到确认,会重新投递消息。
  • 解决方法: 可以在消息中添加唯一标识,消费者在处理消息前先检查是否已经处理过该消息。另外,保证业务处理的幂等性,即多次处理相同消息的结果是一致的。例如,在数据库操作中,可以使用INSERT ... ON DUPLICATE KEY UPDATE语句来保证幂等性。

10.3 消费速度慢问题

  • 原因分析: 消费速度慢可能是由于业务处理逻辑复杂、消费者并发线程数设置不合理、网络延迟等原因导致。
  • 解决方法: 优化业务处理逻辑,减少不必要的计算和I/O操作。根据系统资源和业务需求,合理调整消费者的并发线程数。检查网络环境,优化网络配置,减少网络延迟。同时,可以使用批量消费和异步处理等方式提高消费速度。

通过以上对RocketMQ消费者API的详细介绍、代码示例、配置说明、高级特性以及性能优化和常见问题解决等方面的阐述,相信开发者能够更好地使用RocketMQ消费者进行后端开发,构建高性能、高可靠的分布式系统。