Netty源码分析之揭开reactor线程的面纱
Netty 中的 Reactor 模式概述
在深入 Netty 的 Reactor 线程源码之前,我们先来回顾一下 Reactor 模式的基本概念。Reactor 模式是一种基于事件驱动的设计模式,它用于处理高并发 I/O 操作。在传统的多线程 I/O 处理中,每个连接需要一个独立的线程来处理读写操作,这在高并发场景下会导致大量的线程开销,从而降低系统性能。Reactor 模式通过将 I/O 操作的多路复用与事件处理分离,有效地解决了这个问题。
在 Reactor 模式中,有一个或多个 Reactor 线程负责监听 I/O 事件(如连接建立、数据可读、数据可写等)。当有事件发生时,Reactor 线程将事件分发给对应的事件处理器(Handler)进行处理。这种设计模式使得系统可以高效地处理大量的并发连接,而不需要为每个连接创建一个单独的线程。
Netty 作为一款高性能的网络编程框架,其核心设计采用了 Reactor 模式。Netty 中的 Reactor 线程负责监听网络事件,并将事件分发给相应的 ChannelHandler 进行处理。下面我们将深入 Netty 的源码,剖析 Reactor 线程的实现细节。
Netty 中的 Reactor 线程模型
Netty 提供了两种主要的 Reactor 线程模型:单线程模型和多线程模型。
单线程模型
在单线程模型中,只有一个 Reactor 线程负责监听和处理所有的 I/O 事件。这个线程既要处理新连接的建立,又要处理已连接 Channel 的读写操作。虽然这种模型简单直接,但在高并发场景下,由于所有操作都在一个线程中执行,容易成为性能瓶颈。
多线程模型
Netty 的多线程模型是更为常用的一种模型。在这种模型中,有一组 Reactor 线程负责监听新连接事件(通常称为 Boss 线程组),另一组 Reactor 线程负责处理已连接 Channel 的读写事件(通常称为 Worker 线程组)。这种分工使得系统可以更好地利用多核 CPU 的性能,提高整体的并发处理能力。
Netty 中 Reactor 线程的源码实现
NioEventLoopGroup 类
在 Netty 中,NioEventLoopGroup
类是实现 Reactor 线程模型的关键类之一。它继承自 MultithreadEventLoopGroup
,负责管理一组 NioEventLoop
实例。
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
// 省略其他代码
public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider) {
super(nThreads, executor, selectorProvider);
}
}
在创建 NioEventLoopGroup
实例时,可以指定线程数量(nThreads
)、线程执行器(executor
)以及选择器提供者(selectorProvider
)。MultithreadEventLoopGroup
类会根据指定的线程数量创建相应数量的 NioEventLoop
实例。
NioEventLoop 类
NioEventLoop
类是 Netty 中真正的 Reactor 线程实现类。它继承自 SingleThreadEventExecutor
,并实现了 EventLoop
接口。
public class NioEventLoop extends SingleThreadEventExecutor implements EventLoop {
private final Selector selector;
// 省略其他代码
protected NioEventLoop(EventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
super(parent, executor, false);
this.selector = openSelector();
}
private Selector openSelector() {
try {
return selectorProvider.openSelector();
} catch (IOException e) {
throw new ChannelException("Failed to open a new selector", e);
}
}
}
NioEventLoop
类持有一个 Selector
实例,用于实现 I/O 多路复用。在构造函数中,通过 openSelector()
方法创建一个新的 Selector
实例。
事件循环
NioEventLoop
类的核心是 run()
方法,它实现了 Reactor 线程的事件循环。
protected void run() {
for (;;) {
try {
try {
int selectCnt = select(wakenUp.getAndSet(false));
if (selectCnt != 0 || wakenUp.get() || hasTasks()) {
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} else if (unexpectedSelectorWakeup(selectCnt)) {
processSelectedKeys();
}
} catch (CancelledKeyException e) {
cancelledKeys++;
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
run()
方法中的事件循环大致分为以下几个步骤:
- 调用
select
方法:通过select(wakenUp.getAndSet(false))
方法阻塞等待 I/O 事件发生。wakenUp
变量用于标识是否有外部线程唤醒了当前 Reactor 线程。 - 处理选中的键:如果有事件发生(
selectCnt != 0
),或者线程被唤醒(wakenUp.get()
),或者任务队列中有任务(hasTasks()
),则调用processSelectedKeys()
方法处理选中的SelectionKey
。 - 执行任务队列中的任务:在处理完 I/O 事件后,调用
runAllTasks()
方法执行任务队列中的所有任务。任务队列中通常包含一些非 I/O 相关的操作,如用户自定义的业务逻辑。 - 处理异常:在事件循环过程中,如果发生异常,通过
handleLoopException(t)
方法进行处理。 - 关闭操作:如果当前 Reactor 线程正在关闭(
isShuttingDown()
),则调用closeAll()
方法关闭所有相关的 Channel,并通过confirmShutdown()
方法确认是否可以退出事件循环。
processSelectedKeys 方法
processSelectedKeys()
方法负责处理 Selector
选中的 SelectionKey
。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (cancelledKeys != 0) {
selectedKeys.reset(i + 1);
break;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
} catch (Throwable t) {
handleLoopException(t);
try {
unsafe.close(unsafe.voidPromise());
} catch (Throwable t2) {
handleLoopException(t2);
}
}
}
在 processSelectedKeysOptimized()
方法中,遍历 selectedKeys
数组,获取每个 SelectionKey
及其关联的 AbstractNioChannel
。然后调用 processSelectedKey(k, ch)
方法处理具体的 I/O 事件。
在 processSelectedKey(k, ch)
方法中,首先检查 SelectionKey
是否有效。如果无效,则关闭对应的 Channel
。然后根据 SelectionKey
的就绪操作(readyOps
)判断是读事件(OP_READ
)、连接事件(OP_ACCEPT
)还是写事件(OP_WRITE
),并调用相应的 unsafe
方法进行处理。例如,对于读事件,调用 unsafe.read()
方法读取数据;对于写事件,调用 unsafe.forceFlush()
方法将缓冲区中的数据发送出去。
Reactor 线程与 ChannelHandler 的交互
在 Netty 中,Reactor 线程与 ChannelHandler
之间通过 ChannelPipeline
进行交互。ChannelPipeline
是一个 ChannelHandler
的链表,它负责将 I/O 事件依次传递给链表中的各个 ChannelHandler
进行处理。
当 Reactor 线程处理 I/O 事件时,会调用 Channel
的 unsafe
方法,这些方法会触发 ChannelPipeline
中的事件传播。例如,当读取到数据时,unsafe.read()
方法会调用 ChannelPipeline
的 fireChannelRead()
方法,将读取到的数据传递给 ChannelPipeline
中的第一个 ChannelHandler
。
public final class DefaultChannelPipeline implements ChannelPipeline {
// 省略其他代码
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
}
final class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
// 省略其他代码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
在 fireChannelRead()
方法中,首先调用 AbstractChannelHandlerContext.invokeChannelRead(head, msg)
方法开始事件传播。invokeChannelRead()
方法会根据当前 ChannelHandlerContext
所在的线程,决定是直接调用 invokeChannelRead(m)
方法,还是将任务提交到对应的 EventExecutor
中执行。
在 invokeChannelRead()
方法中,如果当前 ChannelHandler
是 ChannelInboundHandler
,则调用其 channelRead(this, msg)
方法处理事件。如果处理过程中发生异常,则通过 notifyHandlerException(t)
方法进行处理。如果当前 ChannelHandler
不是 ChannelInboundHandler
,则继续调用 fireChannelRead(msg)
方法将事件传递给下一个 ChannelHandler
。
示例代码
下面我们通过一个简单的 Netty 服务器示例,来展示 Reactor 线程模型的实际应用。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyServer {
private static final int PORT = 8888;
public static void main(String[] args) {
// 创建 Boss 线程组,负责监听新连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建 Worker 线程组,负责处理已连接 Channel 的读写事件
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(PORT).sync();
System.out.println("Server started on port " + PORT);
// 等待服务器 socket 关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server received: " + msg);
ctx.writeAndFlush("Message received by server: " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
在上述示例中,我们创建了一个 Netty 服务器。通过 NioEventLoopGroup
创建了 Boss 线程组和 Worker 线程组,分别负责监听新连接和处理已连接 Channel 的读写事件。在 ServerBootstrap
的配置中,我们指定了服务器的 channel 类型为 NioServerSocketChannel
,并为每个新连接的 SocketChannel
添加了 StringDecoder
、StringEncoder
和自定义的 NettyServerHandler
。
NettyServerHandler
继承自 ChannelInboundHandlerAdapter
,并重写了 channelRead
方法和 exceptionCaught
方法。在 channelRead
方法中,服务器接收到客户端发送的消息后,将消息打印并回显给客户端。在 exceptionCaught
方法中,当发生异常时,打印异常堆栈信息并关闭连接。
总结
通过对 Netty 源码中 Reactor 线程的分析,我们深入了解了 Netty 如何利用 Reactor 模式实现高性能的网络编程。Netty 的 Reactor 线程模型通过合理的线程分工和 I/O 多路复用技术,有效地提高了系统的并发处理能力。同时,Netty 提供的灵活的 ChannelHandler
和 ChannelPipeline
机制,使得开发者可以方便地实现各种网络应用的业务逻辑。希望本文的分析和示例代码能够帮助读者更好地理解和应用 Netty 进行网络编程。