MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Spring WebSocket:在Web应用中实现实时双向通信

2023-09-187.3k 阅读

Spring WebSocket基础概念

在深入探讨Spring WebSocket之前,我们先来理解WebSocket协议本身。WebSocket是一种在单个TCP连接上进行全双工通信的协议,它使得客户端和服务器之间能够进行实时双向通信。与传统的HTTP协议不同,HTTP是一种请求 - 响应模式的协议,在完成请求 - 响应周期后连接就会关闭。而WebSocket协议在建立连接后,双方可以随时主动发送消息,这种特性使得它非常适合实现实时应用,如聊天应用、实时数据监控、在线游戏等。

Spring WebSocket是Spring框架对WebSocket协议的支持,它提供了一系列的API和工具,使得在Spring应用中集成WebSocket变得更加容易。Spring WebSocket主要包括以下几个核心部分:

  • WebSocket配置:通过Java配置类或者XML配置文件来配置WebSocket相关的参数,如端点映射、消息编解码器等。
  • WebSocket端点:定义了客户端连接的入口点,类似于HTTP中的控制器。每个端点可以处理特定路径的WebSocket连接,并处理接收到的消息。
  • 消息处理:负责处理从客户端接收到的消息,以及向客户端发送消息。Spring提供了多种消息处理策略,包括简单的文本消息处理和复杂的对象消息处理。

Spring WebSocket的优势

  1. 与Spring生态系统集成:Spring WebSocket可以无缝集成到Spring Boot、Spring MVC等Spring框架的其他模块中。这意味着开发者可以利用Spring的依赖注入、事务管理、安全机制等强大功能来构建WebSocket应用。例如,在一个基于Spring Boot的Web应用中,可以很方便地将WebSocket端点与其他业务逻辑组件整合在一起,共享相同的依赖和配置。
  2. 灵活性和可扩展性:Spring WebSocket提供了丰富的扩展点,开发者可以根据具体需求定制WebSocket的行为。比如,可以自定义消息编解码器,以支持特定的数据格式(如JSON、XML等);也可以实现自定义的握手处理器,在WebSocket连接建立时进行额外的验证和处理。
  3. 性能优化:Spring WebSocket在设计上考虑了性能问题,通过合理的线程管理和资源分配,能够处理大量的并发WebSocket连接。例如,它采用了高效的I/O模型,如NIO(Non - blocking I/O),来提高数据传输的效率,减少线程阻塞,从而提升整个应用的性能。

搭建Spring WebSocket开发环境

  1. Maven依赖:如果使用Maven来管理项目依赖,需要在pom.xml文件中添加Spring WebSocket相关的依赖。对于Spring Boot项目,只需要添加spring - boot - starter - websocket依赖即可:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring - boot - starter - websocket</artifactId>
</dependency>

对于传统的Spring项目,需要添加spring - websocketspring - messaging依赖:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring - websocket</artifactId>
    <version>5.3.18</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring - messaging</artifactId>
    <version>5.3.18</version>
</dependency>
  1. 配置WebSocket:在Spring Boot项目中,可以通过创建一个配置类来配置WebSocket。例如:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
        config.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket - endpoint").withSockJS();
    }
}

在上述配置中:

  • @EnableWebSocketMessageBroker注解开启了WebSocket消息代理功能。
  • configureMessageBroker方法配置了消息代理的相关规则。setApplicationDestinationPrefixes方法设置了应用程序目标的前缀,客户端发送到以/app开头的目的地的消息将被路由到相应的控制器方法。setUserDestinationPrefix方法设置了用户特定目的地的前缀。enableSimpleBroker方法启用了一个简单的消息代理,客户端可以订阅以/topic/queue开头的目的地。
  • registerStompEndpoints方法注册了一个STOMP端点,/websocket - endpoint是端点的路径,.withSockJS()表示支持SockJS协议,SockJS是一个在不支持WebSocket的浏览器中提供WebSocket - like功能的协议。

WebSocket端点的创建与处理

  1. 创建WebSocket端点:在Spring中,可以通过创建一个带有@MessageMapping注解的方法来定义WebSocket端点。例如,创建一个简单的聊天端点:
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class ChatController {

    @MessageMapping("/chat")
    @SendTo("/topic/public - chat")
    public String handleChatMessage(String message) {
        return "Received message: " + message;
    }
}

在上述代码中:

  • @Controller注解表明这是一个Spring MVC控制器,同时也用于处理WebSocket消息。
  • @MessageMapping("/chat")注解将handleChatMessage方法映射到/chat目的地。当客户端向/app/chat发送消息时,该方法会被调用。
  • @SendTo("/topic/public - chat")注解表示将方法的返回值发送到/topic/public - chat目的地,所有订阅了该主题的客户端都将收到这条消息。
  1. 处理不同类型的消息:除了简单的文本消息,Spring WebSocket还可以处理复杂的对象消息。首先,定义一个消息对象:
public class ChatMessage {
    private String sender;
    private String content;

    // getters and setters
    public String getSender() {
        return sender;
    }

    public void setSender(String sender) {
        this.sender = sender;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

然后,修改控制器方法来处理这个对象消息:

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class ChatController {

    @MessageMapping("/chat")
    @SendTo("/topic/public - chat")
    public ChatMessage handleChatMessage(ChatMessage message) {
        return new ChatMessage("Server", "Received from " + message.getSender() + ": " + message.getContent());
    }
}

在这个例子中,客户端发送一个ChatMessage对象到/app/chat,服务器处理后返回一个新的ChatMessage对象到/topic/public - chat

消息编解码

  1. 默认编解码器:Spring WebSocket默认使用SimpleTextMessageCodec来处理文本消息,使用Jackson2TextMessageCodec来处理JSON格式的对象消息(前提是项目中引入了Jackson库)。例如,当客户端发送一个JSON格式的ChatMessage对象时,Spring会自动使用Jackson2TextMessageCodec将其解码为Java对象,然后传递给相应的消息处理方法。

  2. 自定义编解码器:如果需要支持其他数据格式,如XML,可以自定义编解码器。首先,创建一个XML编解码器类:

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageSupport;
import org.springframework.stereotype.Component;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;

@Component
public class XmlMessageConverter implements MessageConverter {

    @Override
    public boolean canConvertFrom(Message<?> message, Class<?> targetClass) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        return StompCommand.SEND.equals(accessor.getCommand()) && targetClass.isAssignableFrom(ChatMessage.class);
    }

    @Override
    public boolean canConvertTo(Object payload, MessageHeaders headers) {
        return payload instanceof ChatMessage;
    }

    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        String payload = (String) message.getPayload();
        try {
            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
            DocumentBuilder builder = factory.newDocumentBuilder();
            Document doc = builder.parse(new StringReader(payload));
            String sender = doc.getElementsByTagName("sender").item(0).getTextContent();
            String content = doc.getElementsByTagName("content").item(0).getTextContent();
            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setSender(sender);
            chatMessage.setContent(content);
            return chatMessage;
        } catch (ParserConfigurationException | SAXException | IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        ChatMessage chatMessage = (ChatMessage) payload;
        try {
            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
            DocumentBuilder builder = factory.newDocumentBuilder();
            Document doc = builder.newDocument();
            org.w3c.dom.Element root = doc.createElement("chatMessage");
            doc.appendChild(root);
            org.w3c.dom.Element senderElement = doc.createElement("sender");
            senderElement.setTextContent(chatMessage.getSender());
            root.appendChild(senderElement);
            org.w3c.dom.Element contentElement = doc.createElement("content");
            contentElement.setTextContent(chatMessage.getContent());
            root.appendChild(contentElement);
            TransformerFactory transformerFactory = TransformerFactory.newInstance();
            Transformer transformer = transformerFactory.newTransformer();
            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
            DOMSource source = new DOMSource(doc);
            StringWriter writer = new StringWriter();
            StreamResult result = new StreamResult(writer);
            transformer.transform(source, result);
            String xmlPayload = writer.toString();
            return MessageSupport.createMessage(xmlPayload, headers);
        } catch (ParserConfigurationException | TransformerException e) {
            e.printStackTrace();
            return null;
        }
    }
}

然后,在WebSocket配置类中注册这个自定义编解码器:

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.util.List;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
        config.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket - endpoint").withSockJS();
    }

    @Override
    public void configureMessageConverters(List<MessageConverter> converters) {
        converters.add(new XmlMessageConverter());
    }
}

这样,Spring WebSocket就可以处理XML格式的ChatMessage消息了。

处理WebSocket连接生命周期

  1. 连接建立:可以通过实现ChannelInterceptor接口来处理WebSocket连接建立的逻辑。例如,记录连接信息:
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessagePostProcessor;
import org.springframework.stereotype.Component;

@Component
public class WebSocketChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            System.out.println("New WebSocket connection attempt from " + accessor.getFirstNativeHeader("Origin"));
        }
        return message;
    }
}

然后,在WebSocket配置类中注册这个拦截器:

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.util.List;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
        config.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket - endpoint").withSockJS();
    }

    @Override
    public void configureMessageConverters(List<MessageConverter> converters) {
        // converters.add(new XmlMessageConverter());
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new WebSocketChannelInterceptor());
    }
}
  1. 连接断开:同样可以在ChannelInterceptor中处理连接断开的逻辑。例如:
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessagePostProcessor;
import org.springframework.stereotype.Component;

@Component
public class WebSocketChannelInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            System.out.println("WebSocket connection disconnected from " + accessor.getFirstNativeHeader("Origin"));
        }
        return message;
    }
}

这样,在连接断开时就会打印相应的日志信息。

安全机制

  1. Spring Security集成:Spring WebSocket可以与Spring Security集成,以确保只有经过认证和授权的用户才能建立WebSocket连接。首先,在pom.xml中添加Spring Security依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring - boot - starter - security</artifactId>
</dependency>

然后,创建一个Spring Security配置类:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
           .authorizeRequests()
               .antMatchers("/websocket - endpoint/**").authenticated()
               .anyRequest().permitAll()
               .and()
           .formLogin();
    }

    @Bean
    @Override
    public UserDetailsService userDetailsService() {
        UserDetails user =
            User.withDefaultPasswordEncoder()
               .username("user")
               .password("password")
               .roles("USER")
               .build();

        return new InMemoryUserDetailsManager(user);
    }
}

在上述配置中:

  • /websocket - endpoint/**路径的请求需要用户认证,其他请求可以被任何人访问。
  • 使用内存中的用户存储来进行用户认证,用户名是user,密码是password,用户角色是USER
  1. 基于角色的授权:可以进一步细化授权规则,根据用户的角色来决定是否允许访问特定的WebSocket端点。例如,只有具有ADMIN角色的用户才能访问某个管理端点:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
           .authorizeRequests()
               .antMatchers("/websocket - endpoint/admin - endpoint").hasRole("ADMIN")
               .antMatchers("/websocket - endpoint/**").authenticated()
               .anyRequest().permitAll()
               .and()
           .formLogin();
    }

    @Bean
    @Override
    public UserDetailsService userDetailsService() {
        UserDetails user =
            User.withDefaultPasswordEncoder()
               .username("user")
               .password("password")
               .roles("USER")
               .build();
        UserDetails admin =
            User.withDefaultPasswordEncoder()
               .username("admin")
               .password("adminpass")
               .roles("ADMIN")
               .build();

        return new InMemoryUserDetailsManager(user, admin);
    }
}

这样,只有具有ADMIN角色的用户才能访问/websocket - endpoint/admin - endpoint这个WebSocket端点。

高级应用场景

  1. 集群环境下的WebSocket:在集群环境中,为了确保WebSocket消息能够正确地发送到所有相关的节点,需要使用分布式消息代理。例如,可以使用RabbitMQ作为消息代理。首先,添加RabbitMQ依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring - boot - starter - amqp</artifactId>
</dependency>

然后,修改WebSocket配置类,使用RabbitMQ作为消息代理:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerProperties;
import org.springframework.messaging.simp.config.StompEndpointRegistry;
import org.springframework.messaging.simp.config.WebSocketMessageBrokerConfigurer;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessagePostProcessor;
import org.springframework.messaging.simp.config.ClientInboundChannelRegistration;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
        config.enableStompBrokerRelay("/topic", "/queue")
           .setRelayHost("localhost")
           .setRelayPort(61613)
           .setClientLogin("guest")
           .setClientPasscode("guest");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket - endpoint").withSockJS();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory);
    }
}

在这个配置中,使用enableStompBrokerRelay方法配置了RabbitMQ作为STOMP消息代理,这样在集群环境下,各个节点可以通过RabbitMQ来共享和传递WebSocket消息。

  1. 与其他系统集成:Spring WebSocket可以与其他系统进行集成,如与物联网(IoT)设备进行通信。假设我们有一个IoT设备,它通过WebSocket协议与我们的Spring应用进行数据交互。首先,定义一个处理IoT设备消息的端点:
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class IoTController {

    @MessageMapping("/iot/data")
    @SendTo("/topic/iot - updates")
    public String handleIoTData(String data) {
        // 处理IoT设备发送的数据
        return "Received IoT data: " + data;
    }
}

然后,在配置类中注册相应的端点:

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
        config.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket - endpoint").withSockJS();
        registry.addEndpoint("/iot - endpoint").withSockJS();
    }
}

这样,IoT设备可以通过/iot - endpoint连接到Spring应用,并发送数据到/app/iot/data,应用处理后将结果发送到/topic/iot - updates,其他订阅该主题的客户端可以收到IoT设备的数据更新。

通过以上内容,我们全面地了解了Spring WebSocket的各个方面,包括基础概念、搭建环境、端点处理、消息编解码、连接生命周期处理、安全机制以及高级应用场景等。Spring WebSocket为开发者提供了一个强大而灵活的工具,能够轻松地在Web应用中实现实时双向通信。无论是开发简单的聊天应用,还是复杂的实时数据监控系统,Spring WebSocket都能满足需求。