MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

RocketMQ 与 Spring Boot 集成实战

2023-12-012.7k 阅读

1. 环境准备

在开始集成RocketMQ与Spring Boot之前,我们需要确保开发环境已经准备妥当。

1.1 安装Java

RocketMQ和Spring Boot都是基于Java开发的,因此首先需要安装Java环境。建议安装JDK 8及以上版本。以Ubuntu系统为例,通过以下命令安装OpenJDK 8:

sudo apt-get update
sudo apt-get install openjdk-8-jdk

安装完成后,通过以下命令检查Java版本:

java -version

1.2 安装Maven

Maven是Java项目的构建工具,我们通过Maven来管理项目依赖。同样在Ubuntu系统下,使用以下命令安装Maven:

sudo apt-get install maven

安装完成后,使用以下命令检查Maven版本:

mvn -version

1.3 安装RocketMQ

RocketMQ的安装可以通过官方提供的二进制包进行。首先从RocketMQ官方GitHub仓库(https://github.com/apache/rocketmq/releases)下载最新的二进制包,例如`rocketmq-all-4.9.3-bin-release.zip`。

下载完成后,解压该压缩包:

unzip rocketmq-all-4.9.3-bin-release.zip
cd rocketmq-all-4.9.3-bin-release

启动NameServer:

nohup sh bin/mqnamesrv &

启动成功后,可以在logs/namesrv.log文件中看到启动日志。接着启动Broker:

nohup sh bin/mqbroker -n localhost:9876 &

同样,启动日志可以在logs/broker.log中查看。

2. 创建Spring Boot项目

我们可以通过Spring Initializr(https://start.spring.io/)来快速创建一个Spring Boot项目。在Spring Initializr页面,进行如下配置:

  • Project:Maven Project
  • Language:Java
  • Spring Boot:选择合适的版本,这里以2.6.7为例
  • Groupcom.example
  • Artifactrocketmq-spring-boot
  • Dependencies:添加Spring WebRocketMQ Starter依赖

完成配置后,点击Generate按钮下载项目压缩包,解压后即可得到一个Spring Boot项目结构。

3. 配置RocketMQ

src/main/resources目录下创建application.properties文件,添加如下RocketMQ相关配置:

rocketmq.name-server=localhost:9876
rocketmq.producer.group=example-group

上述配置中,rocketmq.name-server指定了RocketMQ NameServer的地址,rocketmq.producer.group指定了生产者组的名称。

4. 生产者代码实现

4.1 创建消息发送服务接口

src/main/java/com/example/rocketmqspringboot/service目录下创建MessageSenderService接口:

package com.example.rocketmqspringboot.service;

public interface MessageSenderService {
    void sendMessage(String message);
}

4.2 实现消息发送服务

创建MessageSenderServiceImpl类实现上述接口:

package com.example.rocketmqspringboot.service;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class MessageSenderServiceImpl implements MessageSenderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.send("example-topic", MessageBuilder.withPayload(message).build());
    }
}

在上述代码中,通过RocketMQTemplate发送消息到example-topic主题。RocketMQTemplate是Spring Boot集成RocketMQ提供的用于发送消息的工具类,通过@Autowired注解将其注入到服务类中。

4.3 创建控制器发送消息

src/main/java/com/example/rocketmqspringboot/controller目录下创建MessageController类:

package com.example.rocketmqspringboot.controller;

import com.example.rocketmqspringboot.service.MessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private MessageSenderService messageSenderService;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam String message) {
        messageSenderService.sendMessage(message);
        return "Message sent successfully: " + message;
    }
}

该控制器提供了一个/sendMessage接口,接收一个message参数,调用MessageSenderServicesendMessage方法发送消息,并返回发送成功的提示。

5. 消费者代码实现

5.1 创建消息监听类

src/main/java/com/example/rocketmqspringboot/consumer目录下创建MessageConsumer类:

package com.example.rocketmqspringboot.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "example-topic", consumerGroup = "example-consumer-group")
public class MessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上述代码中,通过@RocketMQMessageListener注解指定了要监听的主题example-topic和消费者组example-consumer-groupMessageConsumer类实现了RocketMQListener接口,并重写onMessage方法来处理接收到的消息。这里简单地将接收到的消息打印出来,实际应用中可以进行更复杂的业务处理。

6. 运行与测试

启动Spring Boot应用,确保RocketMQ的NameServer和Broker都处于运行状态。通过浏览器访问http://localhost:8080/sendMessage?message=Hello,RocketMQ,可以看到控制台输出Message sent successfully: Hello,RocketMQ。同时,在MessageConsumer所在的控制台可以看到打印出Received message: Hello,RocketMQ,表明消息发送和接收都成功。

7. 高级特性与优化

7.1 事务消息

在某些业务场景下,需要保证消息发送与本地事务的一致性,这时候就可以使用RocketMQ的事务消息。

首先,在application.properties中添加事务生产者组配置:

rocketmq.producer.transactionGroup=example-transaction-group

创建事务消息发送服务接口TransactionMessageSenderService

package com.example.rocketmqspringboot.service;

public interface TransactionMessageSenderService {
    void sendTransactionMessage(String message);
}

实现事务消息发送服务TransactionMessageSenderServiceImpl

package com.example.rocketmqspringboot.service;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQTransactionCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class TransactionMessageSenderServiceImpl implements TransactionMessageSenderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendTransactionMessage(String message) {
        rocketMQTemplate.sendMessageInTransaction("example-transaction-topic", MessageBuilder.withPayload(message).build(), null);
    }

    @RocketMQTransactionListener(transactionListenerClass = TransactionListenerImpl.class)
    public static class TransactionListenerImpl implements RocketMQLocalTransactionListener {

        @Override
        public RocketMQTransactionCallback prepareLocalTransaction(Message msg, Object arg) {
            return new RocketMQTransactionCallback() {
                @Override
                public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
                    // 执行本地事务
                    // 模拟本地事务成功
                    return RocketMQLocalTransactionState.COMMIT;
                }

                @Override
                public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
                    // 检查本地事务状态
                    // 模拟检查成功
                    return RocketMQLocalTransactionState.COMMIT;
                }
            };
        }
    }
}

在上述代码中,sendTransactionMessage方法通过RocketMQTemplatesendMessageInTransaction方法发送事务消息。@RocketMQTransactionListener注解指定了事务监听器TransactionListenerImpl,在监听器中定义了本地事务的执行和检查逻辑。

创建事务消息控制器TransactionMessageController

package com.example.rocketmqspringboot.controller;

import com.example.rocketmqspringboot.service.TransactionMessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TransactionMessageController {

    @Autowired
    private TransactionMessageSenderService transactionMessageSenderService;

    @GetMapping("/sendTransactionMessage")
    public String sendTransactionMessage(@RequestParam String message) {
        transactionMessageSenderService.sendTransactionMessage(message);
        return "Transaction message sent successfully: " + message;
    }
}

通过访问/sendTransactionMessage接口,即可发送事务消息。

7.2 消息顺序消费

在一些场景中,例如订单处理,需要保证消息的顺序性。RocketMQ支持顺序消费。

首先创建顺序消息发送服务接口OrderedMessageSenderService

package com.example.rocketmqspringboot.service;

public interface OrderedMessageSenderService {
    void sendOrderedMessage(String message, String orderKey);
}

实现顺序消息发送服务OrderedMessageSenderServiceImpl

package com.example.rocketmqspringboot.service;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderedMessageSenderServiceImpl implements OrderedMessageSenderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendOrderedMessage(String message, String orderKey) {
        rocketMQTemplate.send("example-ordered-topic", MessageBuilder.withPayload(message).build(), orderKey);
    }
}

在上述代码中,通过RocketMQTemplatesend方法发送顺序消息,orderKey用于指定消息的顺序。

创建顺序消息消费者OrderedMessageConsumer

package com.example.rocketmqspringboot.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "example-ordered-topic", consumerGroup = "example-ordered-consumer-group", selectorType = "ORDERLY")
public class OrderedMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Received ordered message: " + message);
    }
}

通过@RocketMQMessageListener注解的selectorType = "ORDERLY"指定为顺序消费。

创建顺序消息控制器OrderedMessageController

package com.example.rocketmqspringboot.controller;

import com.example.rocketmqspringboot.service.OrderedMessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderedMessageController {

    @Autowired
    private OrderedMessageSenderService orderedMessageSenderService;

    @GetMapping("/sendOrderedMessage")
    public String sendOrderedMessage(@RequestParam String message, @RequestParam String orderKey) {
        orderedMessageSenderService.sendOrderedMessage(message, orderKey);
        return "Ordered message sent successfully: " + message;
    }
}

通过访问/sendOrderedMessage接口,可以发送顺序消息并在消费者端按顺序消费。

7.3 消息重试与死信队列

当消息消费失败时,RocketMQ会自动进行重试。默认情况下,消息会重试16次。如果重试16次后仍然失败,消息会被发送到死信队列。

我们可以通过配置消费者的maxReconsumeTimes属性来调整最大重试次数。在application.properties中添加如下配置:

rocketmq.consumer.maxReconsumeTimes=5

上述配置将最大重试次数设置为5次。

8. 性能优化

8.1 生产者性能优化

  • 批量发送消息:生产者可以将多条消息批量发送,减少网络开销。修改MessageSenderServiceImpl类,添加批量发送方法:
package com.example.rocketmqspringboot.service;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class MessageSenderServiceImpl implements MessageSenderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.send("example-topic", MessageBuilder.withPayload(message).build());
    }

    public void sendBatchMessage(List<String> messages) {
        List<org.springframework.messaging.Message<String>> messageList = new ArrayList<>();
        for (String msg : messages) {
            messageList.add(MessageBuilder.withPayload(msg).build());
        }
        SendResult sendResult = rocketMQTemplate.syncSend("example-topic", messageList);
        System.out.println("Batch message send result: " + sendResult);
    }
}

在控制器中添加批量发送接口:

package com.example.rocketmqspringboot.controller;

import com.example.rocketmqspringboot.service.MessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

@RestController
public class MessageController {

    @Autowired
    private MessageSenderService messageSenderService;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam String message) {
        messageSenderService.sendMessage(message);
        return "Message sent successfully: " + message;
    }

    @GetMapping("/sendBatchMessage")
    public String sendBatchMessage(@RequestParam List<String> messages) {
        messageSenderService.sendBatchMessage(messages);
        return "Batch messages sent successfully: " + messages;
    }
}

通过访问/sendBatchMessage接口,传入多个消息内容,即可实现批量发送。

  • 异步发送消息:使用异步发送可以提高发送性能,避免阻塞。修改MessageSenderServiceImpl类,添加异步发送方法:
package com.example.rocketmqspringboot.service;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class MessageSenderServiceImpl implements MessageSenderService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String message) {
        rocketMQTemplate.send("example-topic", MessageBuilder.withPayload(message).build());
    }

    public void sendAsyncMessage(String message) {
        rocketMQTemplate.asyncSend("example-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("Async message sent successfully: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("Async message send failed: " + e);
            }
        });
    }
}

在控制器中添加异步发送接口:

package com.example.rocketmqspringboot.controller;

import com.example.rocketmqspringboot.service.MessageSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private MessageSenderService messageSenderService;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam String message) {
        messageSenderService.sendMessage(message);
        return "Message sent successfully: " + message;
    }

    @GetMapping("/sendAsyncMessage")
    public String sendAsyncMessage(@RequestParam String message) {
        messageSenderService.sendAsyncMessage(message);
        return "Async message sent successfully: " + message;
    }
}

通过访问/sendAsyncMessage接口,即可异步发送消息。

8.2 消费者性能优化

  • 多线程消费:默认情况下,RocketMQ消费者是单线程消费。可以通过配置concurrency属性来增加消费线程数。在application.properties中添加如下配置:
rocketmq.consumer.consumeThreadMin=10
rocketmq.consumer.consumeThreadMax=20

上述配置将消费线程数的最小值设置为10,最大值设置为20。

  • 优化消费逻辑:尽量减少消费逻辑中的复杂计算和I/O操作,将一些耗时操作异步化或者放到线程池中处理,以提高消费速度。

9. 常见问题及解决方法

9.1 无法连接到NameServer

检查application.properties中的rocketmq.name-server配置是否正确,确保NameServer所在主机的IP和端口可访问。同时检查NameServer的启动日志,看是否有异常信息。

9.2 消息发送失败

检查生产者组的配置是否正确,以及主题是否存在。可以通过RocketMQ的控制台工具(如RocketMQ-Console)来查看主题和生产者组的状态。另外,检查网络连接是否正常,以及是否有防火墙限制了RocketMQ的通信端口。

9.3 消息消费失败

检查消费者组的配置是否正确,以及消费逻辑是否有异常。可以在onMessage方法中添加异常捕获,打印详细的错误信息,以便定位问题。同时,注意检查重试次数和死信队列的配置,确保消息在消费失败时能按预期处理。

通过以上步骤,我们完成了RocketMQ与Spring Boot的集成,并对一些高级特性、性能优化以及常见问题进行了探讨和解决。在实际应用中,根据具体的业务需求,可以进一步调整和优化配置,以充分发挥RocketMQ的优势。