4.3 Netty中的Reactor线程模型
上面提到的几种线程模型,在我们编写的基于netty的应用中都有可能出现,甚至可能会不用reactor线程。具体属于哪一种情况,要看我们的代码是如何编写的。
我们先以一个使用了reactor线程模型的netty服务端的典型代码进行说明:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(3); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 128) .attr(AttributeKey.valueOf("ssc.key"),"scc.value") .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); .childAttr(AttributeKey.valueOf("sc.key"),"sc.value") .bind(port);
在上述代码片段中代码很少,却包含了一个复杂reactor线程模型,如下所示:
图中大致包含了5个步骤,而我们编写的服务端代码中可能并不能完全体现这样的步骤,因为Netty将其中一些步骤的细节隐藏了,笔者将会通过图形分析与源码分析相结合的方式帮助读者理解这五个步骤。这个五个步骤可以按照以下方式简要概括:
设置服务端ServerBootStrap启动参数
通过ServerBootStrap的bind方法启动服务端,bind方法会在parentGroup中注册NioServerScoketChannel,监听客户端的连接请求
Client发起连接CONNECT请求,parentGroup中的NioEventLoop不断轮循是否有新的客户端请求,如果有,ACCEPT事件触发
ACCEPT事件触发后,parentGroup中NioEventLoop会通过NioServerSocketChannel获取到对应的代表客户端的NioSocketChannel,并将其注册到childGroup中
childGroup中的NioEventLoop不断检测自己管理的NioSocketChannel是否有读写事件准备好,如果有的话,调用对应的ChannelHandler进行处理
需要提醒的是,这五个步骤是笔者自己的总结,主要是为了方便理解,并不是官方的划分。
下面我们开始详细介绍每一个步骤:
1、设置服务端ServerBootStrap启动参数
ServerBootStrap继承自AbstractBootstrap,其代表服务端的启动类,当调用其bind方法时,表示启动服务端。在启动之前,我们会调用group,channel、handler、option、attr、childHandler、childOption、childAttr等方法来设置一些启动参数。
group方法:
group可以认为是设置执行任务的线程池,在Netty中,EventLoopGroup 的作用类似于线程池,每个EventLoopGroup中包含多个EventLoop对象,代表不同的线程。特别的,我们创建的是2个EventLoopGroup的子类NioEventLoopGroup的实例:parentGroup、childGroup, 所以实际上包含的是多个NioEventLoop对象。从名字上就可以看出,二者是专门用于处理java nio事件的,某种程度上这也验证了我们前面的提到的Netty中的reactor线程模型是结合java nio编程特点来设计的说法。
在创建parentGroup 、childGroup 时,分别传入了构造参数1和3,这对应了上图中红色部分的parentGroup 中只有1个NioEventLoop,绿色部分的childGroup 中有3个NioEventLoop。
特别的,如果我们创建NioEventLoopGroup 的时候,没有指定参数,或者传入的是0,那么这个NioEventLoopGroup包含的NioEventLoop个数将会是:cpu核数*2。
具体可参见NioEventLoopGroup的父类MultithreadEventLoopGroup构造时的相关代码
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final int DEFAULT_EVENT_LOOP_THREADS;//默认线程数量,cpu核数*2 ....... static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } ....... }
在创建完parentGroup 和childGroup 之后,我们把其当做参数传递给了ServerBootStrap,通过调用带有2个参数的group方法。在这个方法中,会把parentGroup 当做参数传递给ServerBootStrap的父类AbstractBootstrap来进行维护,childGroup 则由ServerBootStrap自己维护。
之后,我们可以调用ServerBootStrap 的group()方法来获取parentGroup 的引用,这个方法父类AbstractBootstrap继承的。另外可以通过调用ServerBootStrap自己定义的childGroup()方法来获取workerGroup的引用。
相关代码如下:
io.netty.bootstrap.ServerBootstrap
public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { .......... private volatile EventLoopGroup childGroup;//ServerBootStrap自己维护childGroup .......... public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup);//将parentGroup传递给父类AbstractBootstrap处理 if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup;//给childGroup赋值 return this; } ....... public EventLoopGroup childGroup() {//获取childGroup return childGroup; } }
io.netty.bootstrap.AbstractBootstrap
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ...... private volatile EventLoopGroup group;//这个字段将会被设置为parentGroup ...... public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return (B) this; } ...... public final EventLoopGroup group() {//获取parentGroup return group; } }
channel方法:
channel继承自AbstractBootstrap,用于构造通道的工厂类ChannelFactory实例,在之后需要创建通道实例,例如NioServerSocketChannel的时候,通过调用ChannelFactory.newChannel()方法来创建。
channel方法内部隐含的调用了channelFactory方法,我们也可以直接调用这个方法。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ........ //这个工厂类最终创建的通道实例,就是channel方法指定的NioServerSocketChannel private volatile ChannelFactory<? extends C> channelFactory; .......... public B channel(Class<? extends C> channelClass) { ...... return channelFactory(new BootstrapChannelFactory<C>(channelClass)); } public B channelFactory(ChannelFactory<? extends C> channelFactory) { .......... this.channelFactory = channelFactory; return (B) this; } ....... final ChannelFactory<? extends C> channelFactory() { return channelFactory; } }
细心的读者会发现,除了channel、group方法之外,其他三个方法都是一 一对应的:
handler-->childHandler :分别用于设置NioServerSocketChannel和NioSocketChannel的处理器链,也就是当有一个NIO事件的时候,应该按照怎样的步骤进行处理。
option-->childOption:分别用于设置NioServerSocketChannel和 NioSocketChannel的TCP连接参数,在ChannelOption类中可以看到Netty支持的所有TCP连接参数。
attr-->childAttr:用于给channel设置一个key/value,之后可以根据key获取
其中:
handler、option、attr方法,都是从AbstractBootstrap中继承的。这些方法设置的参数,将会被应用到NioServerSocketChannel实例上,由于NioServerSocketChannel一般只会创建一个,因此这些参数通常只会应用一次。源码如下所示:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ... private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); private volatile ChannelHandler handler; ... public B channel(Class<? extends C> channelClass) { .... return channelFactory(new BootstrapChannelFactory<C>(channelClass)); } public B channelFactory(ChannelFactory<? extends C> channelFactory) { ....... this.channelFactory = channelFactory; return (B) this; } final ChannelHandler handler() {return handler;} public B handler(ChannelHandler handler) {.....} public <T> B option(ChannelOption<T> option, T value){...}//设置ChannelOption参数 final Map<ChannelOption<?>, Object> options(){return options;}//获取ChannelOption参数 public <T> B attr(AttributeKey<T> key, T value){.....} //设置属性 final Map<AttributeKey<?>, Object> attrs() {return attrs;}//获取属性 }
childHandler、childOption、childAttr方法是ServerBootStrap自己定义的,这些方法设置的参数,将会被应用到NioSocketChannel实例上,由于服务端每次接受到一个客户端连接,就会创建一个NioSocketChannel实例,因此每个NioSocketChannel实例都会应用这些方法设置的参数。
public final class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ........... private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); private volatile ChannelHandler childHandler; public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); .... this.childGroup = childGroup; return this; } public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {...} public ServerBootstrap childHandler(ChannelHandler childHandler) {......} .............. }
2 调用ServerBootStrap的bind方法
调用bind方法,就相当于启动了服务端。启动的核心逻辑都是在bind方法中。
bind方法内部,会创建一个NioServerSocketChannel实例,并将其在parentGroup中进行注册,注意这个过程对用户屏蔽了。
parentGroup在接受到注册请求时,会从自己的管理的NioEventLoop中,选择一个进行注册。由于我们的案例中,parentGroup只有一个NioEventLoop,因此只能注册到这个上。
一旦注册完成,我们就可以通过NioServerSocketChannel检测有没有新的客户端连接的到来。
如果一步一步追踪ServerBootStrap.bind方法的调用链,最终会定位到ServerBootStrap 父类AbstractBootstrap的doBind方法,相关源码如下:
io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//初始化NioServerSocketChannel,并注册到bossGroup中 ....//省略 return promise;
doBind方法中,最重要的调用的方法是initAndRegister,这个方法主要完成3个任务
1、创建NioServerSocketChannel实例,这是通过之前创建的ChannelFactory实例的newChannel方法完成
2、初始化NioServerSocketChannel,即将我们前面通过handler,option,attr等方法设置的参数应用到NioServerSocketChannel上
3、将NioServerSocketChannel 注册到parentGroup中,parentGroup会选择其中一个NioEventLoop来运行这个NioServerSocketChannel要完成的功能,即监听客户端的连接。
以下是io.netty.bootstrap.AbstractBootstrap#initAndRegister的源码
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel();//1、创建NioServerSocketChannel实例 try { init(channel);//2、初始化NioServerSocketChannel,这是一个抽象方法,ServerBootStrap对此进行了覆盖 } catch (Throwable t) { channel.unsafe().closeForcibly(); return channel.newFailedFuture(t); } ChannelFuture regFuture = group().register(channel);//3、NioServerSocketChannel注册到parentGroup中 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
ServerBootStrap实现了AbstractBootstrap的抽象方法init,对NioServerSocketChannel进行初始化。熟悉设计模式的同学会意识到,这是典型的模板设计模式,即父类运行过程中会调用多个方法,子类对特定的方法进行覆写。
在这里,init方法主要是为NioServerSocketChannel设置运行参数,也就是我们前面通过调用ServerBootStrap的option、attr、handler等方法指定的参数。
特别需要注意的是,除了我们通过handler方法为NioServerSocketChannel 指定的ChannelHandler之外(在我们这里是LoggingHandler),ServerBootStrap的init方法总是会帮我们在NioServerSocketChannel 的处理器链的最后添加一个默认的处理器ServerBootstrapAcceptor。
从ServerBootstrapAcceptor 名字上可以看出来,其是客户端连接请求的处理器。当接受到一个客户端请求之后,Netty会将创建一个代表客户端的NioSocketChannel对象。而我们通过ServerBoodStrap指定的channelHandler、childOption、childAtrr、childGroup等参数,也需要设置到NioSocketChannel中。但是明显现在,由于只是服务端刚启动,没有接收到任何客户端请求,还没有认为NioSocketChannel实例,因此这些参数要保存到ServerBootstrapAcceptor中,等到接收到客户端连接的时候,再将这些参数进行设置,我们可以看到这些参数通过构造方法传递给了ServerBootstrapAcceptor。
源码如下所示:
io.netty.bootstrap.ServerBootstrap#init
@Override void init(Channel channel) throws Exception {//channel参数类型就是NioServerSocketChannel //1、为NioServerSocketChannel设置option方法设置的参数 final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } //2、为NioServerSocketChannel设置attr方法设置的参数 final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //3、为NioServerSocketChannel设置通过handler方法指定的处理器 ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } //4、为NioSocketChannel设置默认的处理器ServerBootstrapAcceptor,并将相关参数通过构造方法传给ServerBootstrapAcceptor final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
io.netty.channel.EventLoopGroup#register方法
在初始化完成之后,ServerBootStrap通过调用register方法将NioServerSocketChannel注册到了parentGroup中。
从较高的层面来说,parentGroup 的类型是NioEventLoopGroup,一个NioEventLoopGroup可能会管理多个NioEventLoop,对于通道的注册,NioEventLoopGroup会从多个NioEventLoop中选择一个来执行真正的注册。之后这个通道的nio事件,也都是由这个NioEventLoop来处理。也就是说,一个通道只能由一个NioEventLoop来处理,一个NioEventLoop可以处理多个通道,通道与NioEventLoop是多对一的关系。
NioEventLoopGroup的register方法继承自MultithreadEventLoopGroup。
代码如下所示:
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { ....... @Override public ChannelFuture register(Channel channel) { return next().register(channel); } ....... }
next方法的返回值,就是NioEventLoop,可以看到,真正的注册工作,是NioEventLoop完成的。next()方法还提供了通道在NioEventLoop中平均分配的机制。
NioEventLoopGroup创建的时候,其父类MultithreadEventExecutorGroup中会创建一个EventExecutorChooser实例,之后通过其来保证通道平均注册到不同的NioEventLoop中。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { .... private final EventExecutor[] children;//NioEventLoop是EventExecutor的子类,这里的children指的就是NioEventLoop private final AtomicInteger childIndex = new AtomicInteger();//上一次接受注册任务的EventEexcutor编号 private final EventExecutorChooser chooser; ... protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { ...... children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) {//如果指定的线程数是2的幂 chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser();//按照round-robin的方式,来保证平均 } for (int i = 0; i < nThreads; i ++) { boolean success = false; try {//创建EventExecutor实例,注意newChild是抽象方法,NioEventLoopGroup对此方法进行了覆盖,返回的实例是NioEventLoop。 children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { ............. } } ...... } //调用此方法,即可以保证任务的平均分配 @Override public EventExecutor next() { return chooser.next(); } private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1]; } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } } ............ }