MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Netty高效接收网络连接机制解析

2021-11-276.9k 阅读

Netty简介

Netty是一个基于Java NIO的高性能、异步事件驱动的网络应用框架,它提供了对TCP、UDP和HTTP等多种协议的支持。Netty旨在简化和加速网络编程,使得开发人员能够专注于业务逻辑,而不必处理底层网络通信的复杂细节。

在现代的后端开发中,高并发的网络连接处理是一个关键问题。Netty通过其高效的网络连接接收机制,能够轻松应对大量并发连接的场景,提高系统的性能和稳定性。

网络编程基础回顾

在深入了解Netty的高效接收网络连接机制之前,我们先来回顾一些网络编程的基础知识。

网络通信模型

常见的网络通信模型有阻塞I/O(Blocking I/O)、非阻塞I/O(Non - Blocking I/O)和异步I/O(Asynchronous I/O)。

  1. 阻塞I/O:在阻塞I/O模型中,当执行读或写操作时,线程会被阻塞,直到操作完成。例如,在Java的传统I/O中,InputStream.read()方法会阻塞线程,直到有数据可读。这种模型简单直观,但在处理多个连接时效率低下,因为一个线程在等待I/O操作时无法处理其他任务。
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class BlockingIoServer {
    public static void main(String[] args) {
        try (ServerSocket serverSocket = new ServerSocket(8080)) {
            while (true) {
                try (Socket clientSocket = serverSocket.accept()) {
                    InputStream inputStream = clientSocket.getInputStream();
                    byte[] buffer = new byte[1024];
                    int bytesRead = inputStream.read(buffer);
                    while (bytesRead != -1) {
                        System.out.println("Received: " + new String(buffer, 0, bytesRead));
                        bytesRead = inputStream.read(buffer);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  1. 非阻塞I/O:非阻塞I/O允许在I/O操作未完成时,线程不会被阻塞,可以继续执行其他任务。在Java NIO中,通过使用SelectorChannel实现非阻塞I/O。Selector可以监听多个Channel上的事件,如连接建立、数据可读等。当有事件发生时,Selector会通知应用程序进行处理。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NonBlockingIoServer {
    public static void main(String[] args) {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             Selector selector = Selector.open()) {
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8080));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                selector.select();
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int bytesRead = client.read(buffer);
                        if (bytesRead > 0) {
                            buffer.flip();
                            System.out.println("Received: " + new String(buffer.array(), 0, bytesRead));
                        }
                    }
                    keyIterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  1. 异步I/O:异步I/O进一步提升了性能,它允许应用程序在发起I/O操作后继续执行其他任务,而I/O操作完成后会通过回调或Future通知应用程序。Java 7引入的NIO.2(AIO)提供了异步I/O的支持。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AsynchronousIoServer {
    private static final int PORT = 8080;
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) {
        try (AsynchronousSocketChannel serverChannel = AsynchronousSocketChannel.open()) {
            serverChannel.bind(new InetSocketAddress(PORT));
            System.out.println("Server started on port " + PORT);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
                @Override
                public void completed(AsynchronousSocketChannel client, Void attachment) {
                    serverChannel.accept(null, this);
                    try {
                        client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer buffer) {
                                if (result > 0) {
                                    buffer.flip();
                                    System.out.println("Received: " + new String(buffer.array(), 0, result));
                                    buffer.clear();
                                    client.read(buffer, buffer, this);
                                }
                            }

                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    client.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    exc.printStackTrace();
                }
            });
            latch.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

TCP协议与连接管理

TCP(Transmission Control Protocol)是一种面向连接的、可靠的传输协议。在TCP连接建立过程中,需要经过三次握手:

  1. 第一次握手:客户端向服务器发送一个SYN(Synchronize)包,请求建立连接。
  2. 第二次握手:服务器收到SYN包后,回复一个SYN + ACK(Acknowledgment)包,确认收到客户端的请求,并请求客户端确认。
  3. 第三次握手:客户端收到SYN + ACK包后,回复一个ACK包,连接建立完成。

在连接关闭时,需要经过四次挥手:

  1. 第一次挥手:客户端发送一个FIN(Finish)包,请求关闭连接。
  2. 第二次挥手:服务器收到FIN包后,回复一个ACK包,确认收到客户端的关闭请求。
  3. 第三次挥手:服务器处理完剩余数据后,发送一个FIN包,请求关闭连接。
  4. 第四次挥手:客户端收到服务器的FIN包后,回复一个ACK包,连接彻底关闭。

Netty的高效接收网络连接机制

Netty基于Java NIO构建,采用了一系列优化技术来实现高效的网络连接接收。

Reactor模式

Netty采用了经典的Reactor模式。Reactor模式是一种基于事件驱动的设计模式,它通过一个或多个线程来监听事件源(如Socket连接),当有事件发生时,将事件分发给相应的事件处理器进行处理。

在Netty中,有两种类型的Reactor线程:

  1. Boss Group:负责接收新的连接请求,并将新连接分配给Worker Group中的某个线程。
  2. Worker Group:负责处理已建立连接的I/O操作,如读、写数据。

以下是一个简单的Netty服务端启动代码示例,展示了Boss Group和Worker Group的使用:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                })
              .option(ChannelOption.SO_BACKLOG, 128)
              .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            System.out.println("Server started on port " + port);
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyServer(8080).run();
    }
}

线程模型优化

  1. 线程数量的动态调整:Netty允许根据系统的负载情况动态调整线程数量。在启动时,可以根据CPU核心数等因素来初始化Boss Group和Worker Group的线程数量。例如,在创建NioEventLoopGroup时,可以传入线程数量参数:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
  1. 减少线程上下文切换:Netty通过将I/O操作尽量分配到固定的线程上执行,减少了线程上下文切换的开销。每个NioEventLoop负责处理一组连接的I/O操作,使得线程在处理I/O时具有更好的局部性。

内存管理优化

  1. PooledByteBufAllocator:Netty提供了PooledByteBufAllocator来实现内存池化。通过预先分配一定数量的内存块,并在需要时从内存池中获取,避免了频繁的内存分配和释放操作。这大大减少了垃圾回收的压力,提高了性能。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new NettyServerHandler());
        }
    })
  .option(ChannelOption.SO_BACKLOG, 128)
  .childOption(ChannelOption.SO_KEEPALIVE, true)
  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  1. DirectByteBuffer:Netty优先使用DirectByteBuffer,它直接在堆外分配内存,避免了数据在堆内和堆外内存之间的拷贝,提高了I/O操作的效率。

零拷贝技术

Netty在数据传输过程中广泛应用了零拷贝技术。零拷贝是指在数据传输过程中,避免数据在用户空间和内核空间之间的多次拷贝,从而提高数据传输的效率。

  1. FileRegion:在Netty中,当发送文件内容时,可以使用FileRegion实现零拷贝。FileRegion直接将文件的内容通过Socket发送出去,而不需要先将文件内容读入内存再发送。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.CharsetUtil;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpRequest> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
        File file = new File("example.txt");
        RandomAccessFile raf;
        try {
            raf = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            sendError(ctx, HttpResponseStatus.NOT_FOUND);
            return;
        }
        long fileLength = raf.length();
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength);
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

        FileChannel fileChannel = raf.getChannel();
        ctx.write(response);
        ctx.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileLength));
        ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
        raf.close();
    }

    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, status,
                ctx.alloc().buffer(0).writeBytes("Failure: ".getBytes(CharsetUtil.UTF_8)));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. CompositeByteBufCompositeByteBuf允许将多个ByteBuf组合成一个逻辑上的ByteBuf,而不需要进行实际的数据拷贝。在处理网络数据时,常常需要将包头和包体等不同部分的数据组合在一起,CompositeByteBuf可以高效地实现这一功能。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;

public class CompositeByteBufExample {
    public static void main(String[] args) {
        ByteBuf header = Unpooled.wrappedBuffer("Header: ".getBytes());
        ByteBuf body = Unpooled.wrappedBuffer("This is the body".getBytes());
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        compositeByteBuf.addComponents(header, body);
        byte[] array = new byte[compositeByteBuf.readableBytes()];
        compositeByteBuf.getBytes(0, array);
        System.out.println(new String(array));
        compositeByteBuf.release();
    }
}

深入Netty连接接收源码分析

ServerBootstrap启动过程

  1. 创建EventLoopGroupServerBootstrap在启动时,首先创建BossGroupWorkerGroupNioEventLoopGroup是Netty提供的基于NIO的事件循环组。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
  1. 设置Channel类型:通过.channel(NioServerSocketChannel.class)设置服务器端的Channel类型为NioServerSocketChannel,它基于Java NIO的ServerSocketChannel实现。

  2. 设置Handler:通过.childHandler(new ChannelInitializer<SocketChannel>())为新连接设置处理器。在ChannelInitializer中,可以添加各种ChannelHandler来处理I/O事件和业务逻辑。

  3. 绑定端口:通过.bind(port).sync()方法绑定服务器到指定端口,并等待绑定操作完成。

NioServerSocketChannel的初始化

  1. 创建ServerSocketChannelNioServerSocketChannel在初始化时,会创建一个Java NIO的ServerSocketChannel,并将其配置为非阻塞模式。
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}
  1. 注册到SelectorNioServerSocketChannel会将自身注册到BossGroupSelector上,监听OP_ACCEPT事件,即新连接到来的事件。
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

新连接的接收与分配

  1. BossGroup处理新连接:当BossGroupSelector监听到OP_ACCEPT事件时,会调用NioServerSocketChanneldoReadMessages方法来接收新连接。
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (IOException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close a socket.", e);
            }
        }
    }
    return 0;
}
  1. 分配到WorkerGroup:接收到新连接后,BossGroup会将新连接分配给WorkerGroup中的一个NioEventLoop。具体分配方式是通过EventLoopGroupnext()方法,按照一定的策略选择一个NioEventLoop
public EventLoop next() {
    return (EventLoop) children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

WorkerGroup处理连接I/O

  1. 注册到WorkerGroup的Selector:新连接分配到WorkerGroupNioEventLoop后,会将自身注册到该NioEventLoopSelector上,监听OP_READ事件,以便处理读数据操作。

  2. 读数据操作:当WorkerGroupSelector监听到OP_READ事件时,会调用相应的ChannelHandler来处理读数据。在ChannelHandler中,可以对读取到的数据进行解码、业务逻辑处理等操作。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        System.out.println("Received: " + new String(bytes));
    }
}

性能测试与优化建议

性能测试

  1. 测试工具:可以使用工具如JMeter、Gatling等对Netty服务端进行性能测试。通过模拟大量并发连接,测试Netty服务端的吞吐量、响应时间等性能指标。

  2. 测试场景:设置不同的并发连接数、消息大小和发送频率等参数,模拟实际应用中的各种场景。例如,测试在高并发短消息和低并发长消息场景下Netty的性能表现。

优化建议

  1. 合理配置线程数量:根据服务器的硬件资源和业务负载,合理调整BossGroupWorkerGroup的线程数量。如果是CPU密集型业务,可以适当减少线程数量;如果是I/O密集型业务,可以适当增加线程数量。

  2. 优化内存管理:充分利用Netty的内存池化机制,避免频繁的内存分配和释放。同时,合理设置ByteBuf的大小,避免内存浪费。

  3. 使用合适的编解码方式:选择高效的编解码方式,如Protobuf、MessagePack等,减少数据传输和处理的开销。

  4. 监控与调优:通过监控工具如JMX、Prometheus等,实时监控Netty服务端的性能指标,如线程利用率、内存使用情况等,根据监控结果进行针对性的优化。

在后端开发中,Netty的高效接收网络连接机制为处理高并发网络应用提供了强大的支持。通过深入理解其原理和优化技术,并结合实际业务场景进行合理配置和调优,可以构建出高性能、稳定可靠的网络应用。