MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Netty源码分析之揭开reactor线程的面纱

2024-03-286.4k 阅读

Netty 中的 Reactor 模式概述

在深入 Netty 的 Reactor 线程源码之前,我们先来回顾一下 Reactor 模式的基本概念。Reactor 模式是一种基于事件驱动的设计模式,它用于处理高并发 I/O 操作。在传统的多线程 I/O 处理中,每个连接需要一个独立的线程来处理读写操作,这在高并发场景下会导致大量的线程开销,从而降低系统性能。Reactor 模式通过将 I/O 操作的多路复用与事件处理分离,有效地解决了这个问题。

在 Reactor 模式中,有一个或多个 Reactor 线程负责监听 I/O 事件(如连接建立、数据可读、数据可写等)。当有事件发生时,Reactor 线程将事件分发给对应的事件处理器(Handler)进行处理。这种设计模式使得系统可以高效地处理大量的并发连接,而不需要为每个连接创建一个单独的线程。

Netty 作为一款高性能的网络编程框架,其核心设计采用了 Reactor 模式。Netty 中的 Reactor 线程负责监听网络事件,并将事件分发给相应的 ChannelHandler 进行处理。下面我们将深入 Netty 的源码,剖析 Reactor 线程的实现细节。

Netty 中的 Reactor 线程模型

Netty 提供了两种主要的 Reactor 线程模型:单线程模型和多线程模型。

单线程模型

在单线程模型中,只有一个 Reactor 线程负责监听和处理所有的 I/O 事件。这个线程既要处理新连接的建立,又要处理已连接 Channel 的读写操作。虽然这种模型简单直接,但在高并发场景下,由于所有操作都在一个线程中执行,容易成为性能瓶颈。

多线程模型

Netty 的多线程模型是更为常用的一种模型。在这种模型中,有一组 Reactor 线程负责监听新连接事件(通常称为 Boss 线程组),另一组 Reactor 线程负责处理已连接 Channel 的读写事件(通常称为 Worker 线程组)。这种分工使得系统可以更好地利用多核 CPU 的性能,提高整体的并发处理能力。

Netty 中 Reactor 线程的源码实现

NioEventLoopGroup 类

在 Netty 中,NioEventLoopGroup 类是实现 Reactor 线程模型的关键类之一。它继承自 MultithreadEventLoopGroup,负责管理一组 NioEventLoop 实例。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    // 省略其他代码
    public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider) {
        super(nThreads, executor, selectorProvider);
    }
}

在创建 NioEventLoopGroup 实例时,可以指定线程数量(nThreads)、线程执行器(executor)以及选择器提供者(selectorProvider)。MultithreadEventLoopGroup 类会根据指定的线程数量创建相应数量的 NioEventLoop 实例。

NioEventLoop 类

NioEventLoop 类是 Netty 中真正的 Reactor 线程实现类。它继承自 SingleThreadEventExecutor,并实现了 EventLoop 接口。

public class NioEventLoop extends SingleThreadEventExecutor implements EventLoop {
    private final Selector selector;
    // 省略其他代码
    protected NioEventLoop(EventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
        super(parent, executor, false);
        this.selector = openSelector();
    }
    private Selector openSelector() {
        try {
            return selectorProvider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a new selector", e);
        }
    }
}

NioEventLoop 类持有一个 Selector 实例,用于实现 I/O 多路复用。在构造函数中,通过 openSelector() 方法创建一个新的 Selector 实例。

事件循环

NioEventLoop 类的核心是 run() 方法,它实现了 Reactor 线程的事件循环。

protected void run() {
    for (;;) {
        try {
            try {
                int selectCnt = select(wakenUp.getAndSet(false));
                if (selectCnt != 0 || wakenUp.get() || hasTasks()) {
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } else if (unexpectedSelectorWakeup(selectCnt)) {
                    processSelectedKeys();
                }
            } catch (CancelledKeyException e) {
                cancelledKeys++;
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

run() 方法中的事件循环大致分为以下几个步骤:

  1. 调用 select 方法:通过 select(wakenUp.getAndSet(false)) 方法阻塞等待 I/O 事件发生。wakenUp 变量用于标识是否有外部线程唤醒了当前 Reactor 线程。
  2. 处理选中的键:如果有事件发生(selectCnt != 0),或者线程被唤醒(wakenUp.get()),或者任务队列中有任务(hasTasks()),则调用 processSelectedKeys() 方法处理选中的 SelectionKey
  3. 执行任务队列中的任务:在处理完 I/O 事件后,调用 runAllTasks() 方法执行任务队列中的所有任务。任务队列中通常包含一些非 I/O 相关的操作,如用户自定义的业务逻辑。
  4. 处理异常:在事件循环过程中,如果发生异常,通过 handleLoopException(t) 方法进行处理。
  5. 关闭操作:如果当前 Reactor 线程正在关闭(isShuttingDown()),则调用 closeAll() 方法关闭所有相关的 Channel,并通过 confirmShutdown() 方法确认是否可以退出事件循环。

processSelectedKeys 方法

processSelectedKeys() 方法负责处理 Selector 选中的 SelectionKey

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (cancelledKeys != 0) {
            selectedKeys.reset(i + 1);
            break;
        }
    }
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }
    try {
        int readyOps = k.readyOps();
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    } catch (Throwable t) {
        handleLoopException(t);
        try {
            unsafe.close(unsafe.voidPromise());
        } catch (Throwable t2) {
            handleLoopException(t2);
        }
    }
}

processSelectedKeysOptimized() 方法中,遍历 selectedKeys 数组,获取每个 SelectionKey 及其关联的 AbstractNioChannel。然后调用 processSelectedKey(k, ch) 方法处理具体的 I/O 事件。

processSelectedKey(k, ch) 方法中,首先检查 SelectionKey 是否有效。如果无效,则关闭对应的 Channel。然后根据 SelectionKey 的就绪操作(readyOps)判断是读事件(OP_READ)、连接事件(OP_ACCEPT)还是写事件(OP_WRITE),并调用相应的 unsafe 方法进行处理。例如,对于读事件,调用 unsafe.read() 方法读取数据;对于写事件,调用 unsafe.forceFlush() 方法将缓冲区中的数据发送出去。

Reactor 线程与 ChannelHandler 的交互

在 Netty 中,Reactor 线程与 ChannelHandler 之间通过 ChannelPipeline 进行交互。ChannelPipeline 是一个 ChannelHandler 的链表,它负责将 I/O 事件依次传递给链表中的各个 ChannelHandler 进行处理。

当 Reactor 线程处理 I/O 事件时,会调用 Channelunsafe 方法,这些方法会触发 ChannelPipeline 中的事件传播。例如,当读取到数据时,unsafe.read() 方法会调用 ChannelPipelinefireChannelRead() 方法,将读取到的数据传递给 ChannelPipeline 中的第一个 ChannelHandler

public final class DefaultChannelPipeline implements ChannelPipeline {
    // 省略其他代码
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
}
final class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
    // 省略其他代码
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
}

fireChannelRead() 方法中,首先调用 AbstractChannelHandlerContext.invokeChannelRead(head, msg) 方法开始事件传播。invokeChannelRead() 方法会根据当前 ChannelHandlerContext 所在的线程,决定是直接调用 invokeChannelRead(m) 方法,还是将任务提交到对应的 EventExecutor 中执行。

invokeChannelRead() 方法中,如果当前 ChannelHandlerChannelInboundHandler,则调用其 channelRead(this, msg) 方法处理事件。如果处理过程中发生异常,则通过 notifyHandlerException(t) 方法进行处理。如果当前 ChannelHandler 不是 ChannelInboundHandler,则继续调用 fireChannelRead(msg) 方法将事件传递给下一个 ChannelHandler

示例代码

下面我们通过一个简单的 Netty 服务器示例,来展示 Reactor 线程模型的实际应用。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class NettyServer {
    private static final int PORT = 8888;
    public static void main(String[] args) {
        // 创建 Boss 线程组,负责监听新连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建 Worker 线程组,负责处理已连接 Channel 的读写事件
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler());
                        }
                    })
                   .option(ChannelOption.SO_BACKLOG, 128)
                   .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(PORT).sync();
            System.out.println("Server started on port " + PORT);
            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Server received: " + msg);
        ctx.writeAndFlush("Message received by server: " + msg);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在上述示例中,我们创建了一个 Netty 服务器。通过 NioEventLoopGroup 创建了 Boss 线程组和 Worker 线程组,分别负责监听新连接和处理已连接 Channel 的读写事件。在 ServerBootstrap 的配置中,我们指定了服务器的 channel 类型为 NioServerSocketChannel,并为每个新连接的 SocketChannel 添加了 StringDecoderStringEncoder 和自定义的 NettyServerHandler

NettyServerHandler 继承自 ChannelInboundHandlerAdapter,并重写了 channelRead 方法和 exceptionCaught 方法。在 channelRead 方法中,服务器接收到客户端发送的消息后,将消息打印并回显给客户端。在 exceptionCaught 方法中,当发生异常时,打印异常堆栈信息并关闭连接。

总结

通过对 Netty 源码中 Reactor 线程的分析,我们深入了解了 Netty 如何利用 Reactor 模式实现高性能的网络编程。Netty 的 Reactor 线程模型通过合理的线程分工和 I/O 多路复用技术,有效地提高了系统的并发处理能力。同时,Netty 提供的灵活的 ChannelHandlerChannelPipeline 机制,使得开发者可以方便地实现各种网络应用的业务逻辑。希望本文的分析和示例代码能够帮助读者更好地理解和应用 Netty 进行网络编程。