RocketMQ 消息发送原理与实践
RocketMQ 消息发送原理
生产者组(Producer Group)
在 RocketMQ 中,生产者组是一类生产者的集合,这些生产者通常发送相同类型的消息,并且具有相同的发送逻辑。例如,在一个电商系统中,负责发送订单创建消息的生产者可以组成一个生产者组,而负责发送商品库存变更消息的生产者可以组成另一个生产者组。
生产者组在消息发送过程中扮演着重要角色。当某个生产者出现故障时,RocketMQ 可以通过生产者组来确保该组内其他生产者能够继续承担消息发送任务,从而保证整个系统的高可用性。比如,假设某个生产者由于网络问题暂时无法连接到 Broker,RocketMQ 可以自动切换到同一生产者组内的其他生产者来发送消息,避免消息发送的中断。
消息发送流程
-
创建生产者实例 首先,开发者需要创建一个生产者实例,在 Java 客户端中,代码如下:
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
这里通过
DefaultMQProducer
类创建了一个生产者实例,并指定了生产者组名称为producer_group_name
。 -
设置相关参数 可以设置一些必要的参数,例如设置 Name Server 地址:
producer.setNamesrvAddr("127.0.0.1:9876");
Name Server 是 RocketMQ 的路由信息管理中心,生产者需要通过 Name Server 来获取 Broker 的地址等信息。
-
启动生产者 调用
start
方法启动生产者:producer.start();
生产者启动后,会向 Name Server 注册自己,并开始监听 Name Server 上的路由信息变化。
-
创建消息 创建要发送的消息实例,示例代码如下:
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ );
这里创建了一个消息,指定了 Topic 为
TopicTest
,Tag 为TagA
,并设置了消息体内容。Topic 是消息的逻辑分类,Tag 可以进一步对 Topic 中的消息进行细分。 -
发送消息 RocketMQ 提供了多种消息发送方式,包括同步发送、异步发送和单向发送。
- 同步发送:
同步发送会阻塞当前线程,直到消息发送成功或者超时。SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
send
方法返回一个SendResult
对象,包含了消息发送的结果信息,如消息在 Broker 上的存储位置等。 - 异步发送:
异步发送不会阻塞当前线程,消息发送后会立即返回。通过producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("Send message success: %s%n", sendResult); } @Override public void onException(Throwable e) { System.out.printf("Send message failed: %s%n", e); } });
SendCallback
接口的onSuccess
和onException
方法来处理消息发送成功和失败的情况。 - 单向发送:
单向发送只负责发送消息,不关心消息是否成功到达 Broker,通常用于对消息可靠性要求不高且发送性能要求较高的场景,如日志收集等。producer.sendOneway(msg);
- 同步发送:
-
关闭生产者 在应用程序结束时,需要关闭生产者,释放资源:
producer.shutdown();
负载均衡
RocketMQ 的生产者在发送消息时会进行负载均衡,以确保消息能够均匀地分布在各个 Broker 节点上。生产者在发送消息前,会从 Name Server 获取 Topic 的路由信息,包括 Topic 对应的队列信息以及队列与 Broker 的映射关系。
生产者在选择队列进行消息发送时,默认采用轮询算法。例如,假设某个 Topic 有 4 个队列,分别分布在两个 Broker 上,生产者在发送消息时,会依次选择这 4 个队列进行发送,从而实现消息在不同队列和 Broker 上的负载均衡。
同时,生产者还会根据 Broker 的负载情况动态调整消息发送的策略。如果某个 Broker 的负载过高,生产者会适当减少向该 Broker 发送消息的频率,将更多消息发送到负载较低的 Broker 上,以保证整个集群的性能和稳定性。
RocketMQ 消息发送实践
简单消息发送示例
下面是一个完整的简单同步消息发送的 Java 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
// 设置 Name Server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 关闭生产者
producer.shutdown();
}
}
在这个示例中,生产者创建了 10 条消息并同步发送到名为 TopicTest
的 Topic 中。运行该代码后,可以在控制台看到每条消息的发送结果。
异步消息发送示例
以下是异步消息发送的 Java 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
// 设置 Name Server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("Send message success: index=%d, result=%s%n", index, sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("Send message failed: index=%d, exception=%s%n", index, e);
}
});
}
// 等待一段时间,确保异步发送完成
Thread.sleep(2000);
// 关闭生产者
producer.shutdown();
}
}
在这个示例中,生产者以异步方式发送 10 条消息,并通过 SendCallback
处理消息发送的成功和失败情况。由于是异步发送,最后通过 Thread.sleep
等待一段时间,以确保所有异步发送操作完成后再关闭生产者。
单向消息发送示例
以下是单向消息发送的 Java 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
// 设置 Name Server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 单向发送消息
producer.sendOneway(msg);
}
// 关闭生产者
producer.shutdown();
}
}
此示例中,生产者以单向方式发送 10 条消息,不关心消息是否成功到达 Broker,适用于对消息可靠性要求不高但对发送性能要求较高的场景。
批量消息发送
在某些场景下,批量发送消息可以提高发送效率,减少网络开销。RocketMQ 支持批量发送消息,但需要注意的是,批量消息中的所有消息必须具有相同的 Topic 和刷盘策略(同步刷盘或异步刷盘)。
以下是批量消息发送的 Java 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例并指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
// 设置 Name Server 地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
// 创建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
messages.add(msg);
}
// 批量发送消息
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
在这个示例中,生产者创建了 10 条消息并将它们添加到一个列表中,然后通过 producer.send(messages)
方法进行批量发送。批量发送时,RocketMQ 会将这些消息作为一个整体发送到 Broker,从而提高发送效率。
事务消息发送
RocketMQ 支持事务消息,用于确保本地事务与消息发送的最终一致性。例如,在一个电商订单系统中,当创建订单时,需要同时扣减库存,这就涉及到本地数据库事务和消息发送事务。如果本地数据库事务成功,但消息发送失败,可能会导致库存已扣减但订单状态未更新等问题。事务消息可以解决这类问题。
-
事务生产者代码示例
import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 执行本地事务 System.out.println("Execute local transaction: " + new String(msg.getBody())); // 模拟本地事务成功 return LocalTransactionState.COMMIT_MESSAGE; // 模拟本地事务失败可返回 LocalTransactionState.ROLLBACK_MESSAGE; // 模拟本地事务状态未知可返回 LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 检查本地事务状态 System.out.println("Check local transaction: " + new String(msg.getBody())); // 模拟检查事务成功 return LocalTransactionState.COMMIT_MESSAGE; // 模拟检查事务失败可返回 LocalTransactionState.ROLLBACK_MESSAGE; } }; producer.setTransactionListener(transactionListener); producer.start(); Message msg = new Message("TransactionTopic", "TagA", "Hello RocketMQ Transaction".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10000); producer.shutdown(); } }
在这个示例中:
- 创建了一个
TransactionMQProducer
实例,并指定了生产者组为transaction_producer_group
。 - 实现了
TransactionListener
接口,该接口包含两个方法:executeLocalTransaction
方法用于执行本地事务,这里模拟了本地事务成功并返回LocalTransactionState.COMMIT_MESSAGE
。实际应用中,需要在该方法中编写真正的本地事务逻辑,如数据库操作等。checkLocalTransaction
方法用于 RocketMQ Broker 回调生产者检查本地事务状态,这里也模拟了检查事务成功。
- 通过
producer.sendMessageInTransaction
方法发送事务消息,该方法会先发送一条半消息(Half Message)到 Broker,Broker 接收到半消息后会回调executeLocalTransaction
方法执行本地事务,根据本地事务的执行结果决定是否将半消息转换为可消费的消息。如果本地事务执行状态未知,Broker 会定期回调checkLocalTransaction
方法来检查本地事务状态。
- 创建了一个
-
事务消息原理
- 半消息发送:生产者首先向 Broker 发送半消息,半消息对消费者不可见。此时,Broker 会记录该半消息的状态,并向生产者返回发送结果。
- 本地事务执行:生产者接收到半消息发送成功的响应后,执行本地事务,并根据本地事务的执行结果向 Broker 发送二次确认消息,告知 Broker 该事务消息是提交还是回滚。
- 事务状态检查:如果生产者在发送二次确认消息时出现网络问题等异常情况,导致 Broker 未收到确认消息,Broker 会定期回调生产者的
checkLocalTransaction
方法来检查本地事务状态,根据检查结果决定是提交还是回滚该事务消息。
通过事务消息机制,RocketMQ 保证了在分布式系统中,本地事务与消息发送之间的最终一致性,确保消息要么成功发送且本地事务成功,要么两者都失败。
消息发送异常处理
在 RocketMQ 消息发送过程中,可能会出现各种异常情况,开发者需要合理地处理这些异常,以保证系统的稳定性和可靠性。
常见异常类型
- 网络异常:例如生产者与 Broker 之间的网络连接中断,可能会导致消息发送失败。这种情况下,
send
方法可能会抛出MQClientException
或RemotingException
。 - Broker 繁忙异常:当 Broker 负载过高时,可能无法及时处理生产者发送的消息,此时会返回
MQBrokerException
,错误码通常表示 Broker 繁忙。 - 消息格式异常:如果消息的 Topic、Tag 或消息体等不符合要求,如 Topic 不存在等,会抛出
MQClientException
。
异常处理策略
-
重试机制:对于网络异常等可恢复的异常,可以采用重试机制。在 RocketMQ 中,生产者默认会进行重试。例如,同步发送消息时,如果出现
RemotingException
、MQBrokerException
且responseCode
为MQResponseCode.TOPIC_NOT_EXIST
以外的错误,生产者会自动重试 2 次。开发者也可以根据实际情况调整重试次数,示例代码如下:producer.setRetryTimesWhenSendFailed(3);
这里将同步发送消息的重试次数设置为 3 次。
-
记录日志:在捕获到异常时,应及时记录详细的日志信息,包括异常类型、异常消息、消息内容等,以便后续排查问题。例如:
try { SendResult sendResult = producer.send(msg); } catch (Exception e) { // 记录日志 logger.error("Send message failed: ", e); }
-
降级处理:对于一些无法立即解决的异常,如 Broker 长时间繁忙,可以考虑进行降级处理。例如,暂时将消息缓存到本地,待 Broker 恢复正常后再重新发送,或者将消息发送到备用的 Broker 节点。
消息发送性能优化
为了提高 RocketMQ 消息发送的性能,可以从以下几个方面进行优化。
批量发送优化
-
合理控制批量大小:批量发送消息时,需要根据实际情况合理控制批量消息的大小。如果批量消息过大,可能会导致网络传输延迟增加,甚至超过 Broker 的接收限制。一般建议将批量消息的大小控制在 1MB 以内。可以通过以下方式动态调整批量消息的大小:
List<Message> messages = new ArrayList<>(); int totalSize = 0; for (int i = 0; i < 10; i++) { Message msg = new Message("TopicTest", "TagA", ("Message " + i).getBytes()); if (totalSize + msg.getBody().length > 1024 * 1024) { // 发送当前批次消息 producer.send(messages); messages.clear(); totalSize = 0; } messages.add(msg); totalSize += msg.getBody().length; } if (!messages.isEmpty()) { producer.send(messages); }
在这个示例中,通过动态计算消息总大小,当接近 1MB 时,发送当前批次消息,并清空列表准备下一批次。
-
批量消息发送频率:可以适当调整批量消息的发送频率,避免过于频繁地发送小批量消息,增加网络开销。例如,可以设置一个定时器,每隔一定时间发送一次批量消息,而不是每次凑够一定数量就立即发送。
异步发送优化
-
线程池优化:异步发送消息时,RocketMQ 会使用线程池来处理消息发送任务。可以根据系统的负载情况调整线程池的参数,如核心线程数、最大线程数等。例如:
ExecutorService executorService = new ThreadPoolExecutor( 5, 10, 1000 * 60, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("AsyncSendThread"); return thread; } }); producer.setAsyncSenderExecutor(executorService);
这里创建了一个自定义的线程池,并将其设置为生产者的异步发送线程池,通过调整线程池参数,可以更好地适应系统的并发需求。
-
减少回调处理时间:在异步发送的回调函数
SendCallback
中,应尽量减少复杂的业务逻辑处理,避免回调处理时间过长影响后续消息的发送。如果需要进行复杂处理,可以将相关任务提交到其他线程池进行处理。
生产者配置优化
-
Name Server 缓存优化:生产者会缓存 Name Server 返回的路由信息,为了减少 Name Server 的压力,可以适当延长路由信息的缓存时间。可以通过以下配置实现:
producer.setNamesrvAddr("127.0.0.1:9876"); producer.setNamesrvDomain("rocketmq.namesrv.domain"); producer.setNamesrvChannelOptions(Collections.singletonMap( ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)); producer.setDefaultTopicQueueNums(4); producer.setSendMsgTimeout(3000); producer.setCompressMsgBodyOverHowmuch(1024 * 10);
这里通过
setNamesrvChannelOptions
等方法设置了一些 Name Server 相关的配置,如连接超时时间等。同时,setDefaultTopicQueueNums
设置了默认 Topic 的队列数量,setSendMsgTimeout
设置了消息发送超时时间,setCompressMsgBodyOverHowmuch
设置了消息体压缩的阈值。 -
消息压缩:对于消息体较大的情况,可以启用消息压缩功能,减少网络传输的数据量。RocketMQ 支持多种压缩算法,如 ZIP、ZLIB 和 SNAPPY 等。可以通过以下方式启用压缩:
producer.setCompressMsgBodyOverHowmuch(1024 * 10);
这里设置了消息体超过 10KB 时进行压缩。
通过以上性能优化措施,可以有效提高 RocketMQ 消息发送的性能,满足高并发、大数据量的消息发送需求。