MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ API使用详解与最佳实践

2021-01-296.4k 阅读

RocketMQ API 基础概念

RocketMQ 是一款分布式消息中间件,具有高吞吐量、高可用性、适合大规模分布式系统等特点。在深入探讨其 API 使用之前,先了解一些关键概念。

  • Producer:消息生产者,负责发送消息到 RocketMQ 服务器。生产者可以是应用系统中的业务逻辑模块,例如电商系统中的订单创建模块,当一个新订单生成时,该模块作为生产者向 RocketMQ 发送订单相关消息。
  • Consumer:消息消费者,从 RocketMQ 服务器接收消息并进行相应处理。比如在电商系统中,库存管理模块可以作为消费者,接收订单创建消息后进行库存扣减等操作。
  • Topic:主题,是消息的逻辑分类。不同类型的消息可以发送到不同的 Topic,例如电商系统中,订单相关消息可以发送到 “order - topic”,物流相关消息可以发送到 “logistics - topic”。
  • Queue:队列,是 Topic 的物理分区。一个 Topic 可以包含多个 Queue,通过设置多个 Queue 可以提高消息的并行处理能力。例如,“order - topic” 可以设置 4 个 Queue,当有大量订单消息时,不同的消费者可以并行从不同的 Queue 中拉取消息进行处理。

环境搭建

在使用 RocketMQ API 之前,需要搭建 RocketMQ 运行环境。这里以 Linux 环境为例进行说明。

  1. 下载 RocketMQ:从 RocketMQ 官方网站下载最新版本的安装包,例如 rocketmq - all - 4.9.4 - bin - release.zip。
  2. 解压安装包:使用命令 unzip rocketmq - all - 4.9.4 - bin - release.zip 解压安装包到指定目录。
  3. 启动 NameServer:进入 RocketMQ 解压目录,执行 nohup sh bin/mqnamesrv & 启动 NameServer。NameServer 是 RocketMQ 的名称服务,负责保存 Topic 和 Broker 的映射关系。
  4. 启动 Broker:在启动 NameServer 后,执行 nohup sh bin/mqbroker - n localhost:9876 & 启动 Broker。Broker 负责存储和转发消息。

发送消息

同步发送

同步发送是指消息发送后,生产者会等待 RocketMQ 服务器的响应,只有收到响应后才会继续执行后续代码。这种方式适用于对消息发送可靠性要求较高的场景,例如订单创建消息,必须确保消息成功发送到 RocketMQ 服务器。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        for (int i = 0; i < 10; i++) {
            // 创建消息实例,指定 Topic、Tag 和消息体
            Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes("UTF - 8"));
            // 同步发送消息,等待服务器响应
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        // 关闭生产者
        producer.shutdown();
    }
}

在上述代码中,首先创建了一个 DefaultMQProducer 实例,并指定生产者组为 “group1”。然后设置 NameServer 地址,启动生产者。在循环中,创建消息并同步发送,最后打印发送结果。发送完成后,关闭生产者。

异步发送

异步发送是指消息发送后,生产者不会等待服务器响应,而是继续执行后续代码。当服务器响应到达时,通过回调函数进行处理。这种方式适用于对消息发送性能要求较高,且允许一定概率消息发送失败的场景,例如日志消息的发送。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            final int index = i;
            Message message = new Message("TopicTest", "TagB", ("Hello RocketMQ Async " + i).getBytes("UTF - 8"));
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%d Send message success: %s%n", index, sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%d Send message failed: %s%n", index, e);
                }
            });
        }

        Thread.sleep(10000);
        producer.shutdown();
    }
}

在这段代码中,同样创建了生产者实例并启动。在发送消息时,通过 producer.send(message, new SendCallback()) 方法传入回调函数。当消息发送成功时,onSuccess 方法会被调用;当发送失败时,onException 方法会被调用。最后通过 Thread.sleep(10000) 等待一段时间,确保异步回调函数有足够时间执行,然后关闭生产者。

单向发送

单向发送是指消息发送后,生产者既不等待服务器响应,也不通过回调函数处理响应。这种方式适用于对消息可靠性要求较低,且对性能要求极高的场景,例如一些监控数据的上报。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group3");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagC", ("Hello RocketMQ Oneway " + i).getBytes("UTF - 8"));
            producer.sendOneway(message);
        }

        producer.shutdown();
    }
}

此代码创建生产者并启动后,在循环中使用 producer.sendOneway(message) 方法单向发送消息,发送完成后关闭生产者。由于不关心发送结果,这种方式性能最高,但可能会丢失消息。

接收消息

推模式(PushConsumer)

推模式下,RocketMQ 服务器会主动将消息推送给消费者。消费者通过注册监听器的方式来处理接收到的消息。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PushConsumer {
    public static void main(String[] args) throws Exception {
        // 创建一个消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group4");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅 Topic 和 Tag
        consumer.subscribe("TopicTest", "TagA || TagB");
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

在上述代码中,创建了 DefaultMQPushConsumer 实例,并指定消费者组为 “group4”。设置 NameServer 地址后,通过 consumer.subscribe("TopicTest", "TagA || TagB") 订阅 “TopicTest” 主题下的 “TagA” 和 “TagB” 消息。接着注册消息监听器,在监听器的 consumeMessage 方法中处理接收到的消息。最后启动消费者。

拉模式(PullConsumer)

拉模式下,消费者主动从 RocketMQ 服务器拉取消息。这种方式可以让消费者更灵活地控制消息拉取的时机和频率。

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;
import java.util.Set;

public class PullConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group5");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            long offset = consumer.fetchConsumeOffset(mq, true);
            while (true) {
                PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> msgList = pullResult.getMsgFoundList();
                        for (MessageExt msg : msgList) {
                            System.out.printf("Receive New Messages: %s %n", msg);
                        }
                        offset = pullResult.getNextBeginOffset();
                        break;
                    case NO_NEW_MSG:
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
            }
        }
        consumer.shutdown();
    }
}

代码中创建 DefaultMQPullConsumer 实例并启动。通过 consumer.fetchSubscribeMessageQueues("TopicTest") 获取 “TopicTest” 主题的所有队列。然后在每个队列上通过 consumer.pullBlockIfNotFound 方法拉取消息,根据拉取结果进行相应处理,最后关闭消费者。

高级特性与最佳实践

消息顺序性

在某些场景下,例如订单处理流程,消息的顺序至关重要。RocketMQ 支持局部顺序消息,即保证在一个队列内消息的顺序性。

  1. 顺序发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group6");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        List<MessageQueue> mqs = producer.fetchPublishMessageQueues("OrderTopic");
        int queueIndex = 0;
        for (int i = 0; i < 10; i++) {
            String orderId = "Order_001";
            int queue = Math.abs(orderId.hashCode() % mqs.size());
            Message message = new Message("OrderTopic", ("Order Message " + i).getBytes("UTF - 8"));
            SendResult sendResult = producer.send(message, mqs.get(queue));
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

在上述代码中,通过 producer.fetchPublishMessageQueues("OrderTopic") 获取 “OrderTopic” 的队列列表。根据订单 ID 的哈希值选择队列,确保相同订单 ID 的消息发送到同一个队列,从而保证顺序。

  1. 顺序消费
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group7");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("OrderTopic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

在消费者端,通过注册 MessageListenerOrderly 监听器,RocketMQ 会保证在一个队列内消息按顺序消费。

消息过滤

RocketMQ 支持通过 Tag 和 SQL92 表达式进行消息过滤。

  1. Tag 过滤:在前面的推模式和拉模式示例中,已经展示了通过 Tag 进行消息过滤,例如 consumer.subscribe("TopicTest", "TagA || TagB"),只接收 “TopicTest” 主题下 “TagA” 和 “TagB” 的消息。
  2. SQL92 过滤: 首先在生产者发送消息时设置属性:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class SqlProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group8");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            Message message = new Message("SqlTopic", ("SQL Message " + i).getBytes("UTF - 8"));
            message.putUserProperty("age", String.valueOf(i));
            SendResult sendResult = producer.send(message);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

然后在消费者端使用 SQL92 表达式过滤:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class SqlConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group9");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("SqlTopic", "age > 5");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

在上述代码中,生产者在消息中设置了 “age” 属性,消费者通过 consumer.subscribe("SqlTopic", "age > 5") 过滤出 “age” 属性大于 5 的消息。

事务消息

在一些分布式事务场景中,需要保证消息发送与业务操作的原子性。RocketMQ 提供了事务消息功能。

  1. 事务生产者
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("group10");
        producer.setNamesrvAddr("localhost:9876");
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // 执行本地业务逻辑
                System.out.println("Execute local transaction: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 检查本地事务状态
                System.out.println("Check local transaction: " + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();

        Message message = new Message("TransactionTopic", ("Transaction Message").getBytes("UTF - 8"));
        SendResult sendResult = producer.sendMessageInTransaction(message, null);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在上述代码中,创建 TransactionMQProducer 实例,并设置事务监听器。在 executeLocalTransaction 方法中执行本地业务逻辑,根据业务执行结果返回 LocalTransactionState.COMMIT_MESSAGE(提交消息)、LocalTransactionState.ROLLBACK_MESSAGE(回滚消息)或 LocalTransactionState.UNKNOW(未知状态,RocketMQ 会进行事务状态检查)。在 checkLocalTransaction 方法中检查本地事务状态。

  1. 事务消费者: 事务消费者与普通消费者类似,只是处理事务消息时,需要确保幂等性,防止重复消费。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class TransactionConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group11");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TransactionTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

在消费者端,通过正常的消息监听器处理事务消息,在处理业务逻辑时,通过检查业务状态等方式确保幂等性。

高可用性与负载均衡

  1. NameServer 高可用性:NameServer 本身是无状态的,可以部署多个 NameServer 实例,生产者和消费者通过配置多个 NameServer 地址来实现高可用性,例如 producer.setNamesrvAddr("localhost:9876;localhost:9877")
  2. Broker 高可用性:RocketMQ 支持 Master - Slave 架构,Master 负责处理读写请求,Slave 从 Master 同步数据。当 Master 出现故障时,Slave 可以切换为 Master 继续提供服务。
  3. 负载均衡
    • 生产者负载均衡:生产者在发送消息时,会根据负载均衡策略选择队列。默认的负载均衡策略是轮询,即依次选择队列发送消息。可以通过实现 MessageQueueSelector 接口来自定义负载均衡策略。
    • 消费者负载均衡:消费者组内的消费者会自动进行负载均衡,分配不同的队列给不同的消费者进行消费。如果新增或减少消费者,RocketMQ 会自动重新分配队列。

性能优化

  1. 批量发送消息:生产者可以批量发送消息,减少网络开销。例如:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group12");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("BatchTopic", ("Batch Message " + i).getBytes("UTF - 8"));
            messages.add(message);
        }

        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);

        producer.shutdown();
    }
}

在上述代码中,创建了一个消息列表,将多个消息添加到列表中,然后通过 producer.send(messages) 批量发送消息。

  1. 合理设置线程池:消费者在处理消息时,可以根据业务需求合理设置线程池大小。例如,在 DefaultMQPushConsumer 中,可以通过 consumer.setConsumeThreadMin(int)consumer.setConsumeThreadMax(int) 方法设置最小和最大消费线程数。
  2. 优化消息存储:可以通过调整 RocketMQ 的存储配置,如刷盘策略(同步刷盘、异步刷盘)、文件存储路径等,来提高消息存储性能。同步刷盘保证数据的可靠性,但性能相对较低;异步刷盘性能较高,但可能会丢失少量数据。

常见问题与解决方法

  1. 消息发送失败
    • 原因:可能是 NameServer 地址配置错误、网络问题、Topic 不存在等。
    • 解决方法:检查 NameServer 地址是否正确,确保网络连接正常,通过 RocketMQ 控制台或命令行工具检查 Topic 是否存在。
  2. 消息消费失败
    • 原因:可能是消费逻辑异常、消息格式错误等。
    • 解决方法:在消费监听器中捕获异常,打印详细的错误信息,检查消息格式是否符合业务要求。
  3. 队列负载不均衡
    • 原因:可能是消费者数量与队列数量不匹配、消费者负载能力差异较大等。
    • 解决方法:根据业务流量合理调整消费者数量和队列数量,确保消费者负载能力相近。可以通过监控工具查看队列的消费进度和消费者的负载情况,进行相应调整。

通过深入理解和合理运用 RocketMQ 的 API,结合上述最佳实践和性能优化方法,可以构建出高效、可靠的分布式消息系统,满足各种复杂业务场景的需求。在实际应用中,还需要根据具体业务需求和系统环境进行适当的调整和优化。