目前对于Netty的理解是:一套完善了Java NIO操作的框架,因为Netty的最底层还是调用jdk的nio相关的API,但是又在jdk的nio基础上做了很多的封装,并衍生出来了自己相关的概念。
服务启动的主线操作 以EchoServer
为例,一条可参考的服务启动的主线操作如下:
main thread
创建selector
创建serversocketchannel
初始化serversocketchannel
给serversocketchannel从bossgroup中选择一个NioEventLoop
boss thread
将serversocketchannel注册到选择的NioEventLoop的selector
绑定地址启动
注册接受连接事件(OP_ACCEPT)到selector上
对应到代码中的操作依次为:
1 2 3 4 5 6 7 8 9 10 Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); javaChannel().bind(localAddress, config.getBacklog()); selectionKey.interestOps(OP_ACCEPT);
根据上面的启动主线,以它为一个参考,我觉得这个主线过于简略,也就是说很多操作并不能体现出来。所以我将创建一个server的步骤分成如下3个大的步骤。下面会对上述3个大步骤做适当、尽可能详细的分析,并将之前看过的源码内容与服务创建联系起来。
创建EventLoopGroup
EventLoopGroup
的个数决定具体reactor模式,在EchoServer
中使用了两个EventLoopGroup
,也就是使用了主从Reactor多线程模式 ;而EventLoopGroup
的类型决定使用的IO模式,这里使用的是``NioEventLoopGroup也就是使用的IO模式为NIO。对应于
EchoServer`中的代码为:
1 2 EventLoopGroup bossGroup = new NioEventLoopGroup (1 );EventLoopGroup workerGroup = new NioEventLoopGroup ();
到这里有若干个问题,但是主线中不会提及,因为主线关注的是操作链条,在大致了解主线操作之后,加强对个各个细节处的理解,才能理解得更加透彻。这里的问题主要来自心中的疑问。
什么是EventLoopGroup
、EventLoop
之前的初略理解是:EventLoopGroup
是一个线程池、EventLoop
则对应一个线程。
EventLoopGroup
是一个接口,有多种实现方式,在EchoServer
中,它的实现是NioEventLoopGroup
。
Channel
、EventLoop
、EventLoopGroup
之间的关系如下图所示:
一个EventLoopGroup
包含一个或者多个EventLoop
;
一个EventLoop
在它的生命周期内只和一个Thread
绑定;
所有由EventLoop
处理的I/O事件都将在它专有的Thread
上被处理;
一个Channel
在它的生命周期内只注册于一个EventLoop
;
一个EventLoop
可能会被分配给一个或多个Channel
。
为什么new NioEventLoopGroup(1)
传1 在前面的一篇文章中有分析过为什么传1,因为对于boss group来说,只会有一个channel,所以只绑定1个线程,也就只需要1个EventLoop
。
如果不传任何参数,线程数在NioEventLoopGroup
中会先传0,在父类MultithreadEventLoopGroup
先再做判断,如果为0,那么会默认为:
1 2 3 4 5 6 DEFAULT_EVENT_LOOP_THREADS = Math.max(1 , SystemPropertyUtil.getInt( "io.netty.eventLoopThreads" , NettyRuntime.availableProcessors() * 2 )); protected MultithreadEventLoopGroup (int nThreads, Executor executor, Object... args) { super (nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
获取SelectorProvider 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public NioEventLoopGroup (ThreadFactory threadFactory) { this (0 , threadFactory, SelectorProvider.provider()); } public static SelectorProvider provider () { synchronized (lock) { if (provider != null ) return provider; return AccessController.doPrivileged( new PrivilegedAction <SelectorProvider>() { public SelectorProvider run () { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
其中sun.nio.ch.DefaultSelectorProvider
在不同平台jdk中、其实现不一样,从而以此达到,统一不同平台下Selector
的实现。其中Windows下的实现如下:
1 2 3 4 5 6 7 8 public class DefaultSelectorProvider { private DefaultSelectorProvider () { } public static SelectorProvider create () { return new WindowsSelectorProvider (); } }
根据线程数创建EventExecutor
数组并初始化 在MultithreadEventExecutorGroup
的构造函数中,会初始化一个EventExecutor
数组,其实这就是NioEventLoop
数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 children = new EventExecutor [nThreads]; for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(executor, args); success = true ; } catch (Exception e) { throw new IllegalStateException ("failed to create a child event loop" , e); } finally { if (!success) { } } } chooser = chooserFactory.newChooser(children);
选择器是用来选择一个EventLoop
,供进行事件处理。具体有两种策略,一种是EventLoop个数为2的倍数(通过&运算)、一种是普通的(通过%运算)。说真的,我对这两者的差异没有啥感觉。
对于newChild()
这个方法,我们找到对应的NioEventLoopGroup
的实现:
也就是说,在初始化NioEventLoopGroup
的时候,就已经将所有的NioEventLoop
初始化完成。
NioEventLoop的初始化
打开openSelector()
操作。说明selector在初始化NioEventLoop
时就被打开。
1 2 3 4 5 6 7 8 9 10 11 final SelectorTuple selectorTuple = openSelector();private SelectorTuple openSelector () { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException ("failed to open a new selector" , e); } ... }
初始化一个包装Runnable后的executor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 this .executor = ThreadExecutorMap.apply(executor, this );public static Executor apply (final Executor executor, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(executor, "executor" ); ObjectUtil.checkNotNull(eventExecutor, "eventExecutor" ); return new Executor () { @Override public void execute (final Runnable command) { executor.execute(apply(command, eventExecutor)); } }; } public static Runnable apply (final Runnable command, final EventExecutor eventExecutor) { ObjectUtil.checkNotNull(command, "command" ); ObjectUtil.checkNotNull(eventExecutor, "eventExecutor" ); return new Runnable () { @Override public void run () { setCurrentEventExecutor(eventExecutor); try { command.run(); } finally { setCurrentEventExecutor(null ); } } }; }
通过Bootstrap
设置相关参数 绑定端口并启动 这一步骤有多个重要的过程。
创建NioServerSocketChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) public B channel (Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory <C>( ObjectUtil.checkNotNull(channelClass, "channelClass" ) )); } public B channelFactory (io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } public B channelFactory (ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory" ); if (this .channelFactory != null ) { throw new IllegalStateException ("channelFactory set already" ); } this .channelFactory = channelFactory; return self(); }
此处的channelFactory
是一个ReflectiveChannelFactory
,它的实现比较简单,通过class获取到构造器,然后通过构造器即可创建出新的对象 。简单的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ReflectiveChannelFactory (Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz" ); try { this .constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException (...); } } @Override public T newChannel () { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException (...); } }
初始化完channelFactory
之后,进入doBind()
阶段,此时会通过一个channelFactory
来创建一个新的对象,即NioServerSocketChannel
。如下:
1 2 3 4 5 6 7 8 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); } }
在NioServerSocketChannel
创建的时候,会创建一个ServerSocketChannel
,调用栈依次为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioServerSocketChannel () { this (newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig (this , javaChannel().socket()); } protected AbstractNioMessageChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent, ch, readInterestOp); } protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { } } protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline () { return new DefaultChannelPipeline (this ); }
初始化刚创建的NioServerSocketChannel 1 2 3 4 5 6 7 8 9 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } }
在init()
中,主要是给新创建的NioServerSocketChannel
设置参数,然后在它的pipeline上面添加一个ChannelHandler
,用来处理新建立的连接,也就是ServerBootstrapAcceptor
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override void init (Channel channel) { ChannelPipeline p = channel.pipeline(); p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
其中,p.addLast()
执行的时候,如果当前channel,即NioServerSocketChannel
,没有注册到一个EventLoop,那么将会以PendingHandlerCallback
的形式,保存到pipeline的pengdingHandlerCallbackHead
这个链表上。
其中PendingHandlerCallback
可以简单理解成:只是一个保存有AbstractChannelHandlerContext
的Runnable
,并且具有单向链表结构。它的设计目的,就是等待某个时候被执行。
注册channel到eventloop 从ChannelFuture regFuture = config().group().register(channel);
开始,一路可以追踪到AbstractChannel
的register
方法里面。刚开始有一段状态判断的代码,这段代码可以印证前面的一个说法:一个channel只能注册到一个eventloop 。
接着在eventLoop的线程中执行register0()
方法,也就是说main线程开始返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } }
也就是说,initAndRegister()
返回之后会拿到一个ChannelFuture,
1 2 3 4 5 6 7 8 9 10 11 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } ChannelFuture regFuture = config().group().register(channel); return regFuture; }
如果在eventLoop中执行register0()
完毕,那么将继续执行doBind0()
,在main线程中;如果没执行完毕,会等register0()
在eventLoop中执行完毕之后,再执行doBind0()
,此时的线程是eventLoop。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
至此,EchoServer
中ChannelFuture f = b.bind(PORT).sync()
中的b.bind(PORT)
在main线程中的操作已经完成。所以当前步骤又分成了两个,即:1. register0;2. doBind0。
1. register0 此函数关键操作的流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private void register0 (ChannelPromise promise) { try { doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
执行PendingHandlerCallback的流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 final void invokeHandlerAddedIfNeeded () { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false ; callHandlerAddedForAllHandlers(); } } private void callHandlerAddedForAllHandlers () { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this ) { assert !registered; registered = true ; pendingHandlerCallbackHead = this .pendingHandlerCallbackHead; this .pendingHandlerCallbackHead = null ; } PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null ) { task.execute(); task = task.next; } }
PendingHandlerAddedTask
的结构在前面已有描述,这里补充一下它的代码实现。其实task.execute();
最终执行的代码是callHandlerAdded0(ctx)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private abstract static class PendingHandlerCallback implements Runnable { final AbstractChannelHandlerContext ctx; PendingHandlerCallback next; PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this .ctx = ctx; } abstract void execute () ; } private final class PendingHandlerAddedTask extends PendingHandlerCallback { @Override public void run () { callHandlerAdded0(ctx); } @Override void execute () { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this ); } catch (RejectedExecutionException e) { atomicRemoveFromHandlerList(ctx); ctx.setRemoved(); } } } }
执行完ChanneInitializer
之后,将其从pipeline中移除。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private void callHandlerAdded0 (final AbstractChannelHandlerContext ctx) { try { ctx.callHandlerAdded(); } catch (Throwable t) { boolean removed = false ; try { atomicRemoveFromHandlerList(ctx); ctx.callHandlerRemoved(); removed = true ; } } } final void callHandlerAdded () throws Exception { if (setAddComplete()) { handler().handlerAdded(this ); } } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { removeState(ctx); } } } private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } return true ; } return false ; } protected abstract void initChannel (C ch) throws Exception;
再来看一眼当时的initChannel
是如何实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
此时的pipeline.addLast(new ServerBootstrapAcceptor...)
中的ServerBootstrapAcceptor
,是不是也会像ChannleInitializer
一样被删除呢?
从直觉上来说,是不会的,因为如果删掉了谁来处理新建立的连接,那么它为什么没有被删掉呢?因为ServerBootstrapAcceptor
并没有重写handlerAdded()
这个方法,使用的父类默认实现,即啥都不干。它与ChannelInitializer
同是继承自ChannelInboundHandlerAdapter
,但是ChannelInitializer
重写了handlerAdded()
, 并在这个函数中,在初始化之后,将自身从pipeline上移除掉了。因为它只需要执行一次就即可。
safeSetSuccess(promise) 使用了观察者模式,将执行之前注册到ChannelFuture上面的ChannelFutureListener
,也就是会接着执行doBind0()。这部分代码逻辑比较明了,看关键的代码片段即可:
经典的观察者模式,套路满满。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void notifyListeners0 (DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0 ; i < size; i ++) { notifyListener0(this , a[i]); } } private static void notifyListener0 (Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { } }
执行此ChannelFutureListener,也就是进入doBind0()阶段,这个操作内容不多,只是将bind的具体操作放到了Runnable中,然后扔到eventLoop的taskQueue中,等待下次eventLoop执行该task。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); private static void doBind0 (...) { channel.eventLoop().execute(new Runnable () { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
fireChannelRegistered 关于消息的传递,可以放到后续的连接建立后的消息发送模块。这里先提出我的几个问题:
事件如何在pipeline中传递?
什么是executionMask
?这个实现看起来很有趣,通过executionMask
与对应的事件编码相与,得出此Handler是否可以处理此消息。详细可后续进一步分析。
至此register0的操作完成。
2. doBind0 框架代码的流程:从pipeline的tail出发,触发bind事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } public final ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
此处的tail是LoggingHandler
,也就是说会打印出bind日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public ChannelFuture bind (final SocketAddress localAddress, final ChannelPromise promise) { final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable () { @Override public void run () { next.invokeBind(localAddress, promise); } }, promise, null , false ); } return promise; } private void invokeBind (SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this , localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } @Override public void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "BIND" , localAddress)); } ctx.bind(localAddress, promise); }
从这里开始,又回到这个代码串的bind()
方法处,直到执行invokeBind()时,才跳转到LoggingHandler
的前驱,一个DefaultChannelPipeline#HeadContext
,它才是bind操作的主要流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public void bind (ChannelHandlerContext, SocketAddress, ChannelPromise) { unsafe.bind(localAddress, promise); } protected abstract void doBind (SocketAddress localAddress) throws Exception;@Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
调用栈:
至此bind操作完成。
3. fireChannelActive 与fireChannelRegistered
的调用类似,也是从pipeline的head开始触发,只是事件的名称换了,并且多了一个设置OP_ACCEPT
的操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { @Override public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); } } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } } @Override public Channel read () { pipeline.read(); return this ; } @Override public final ChannelPipeline read () { tail.read(); return this ; } @Override public ChannelHandlerContext read () { final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { } return this ; } private void invokeRead () { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).read(this ); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } } @Override public void read (ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } } @Override protected void doBeginRead () throws Exception { if (inputShutdown) { return ; } super .doBeginRead(); } @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
调用栈为:
通过eventLoop来执行task的方式,其实是通过一个taskQueue来接收这些Runnable,然后eventLoop再通过统一调度,获取taskQueue里面的task,然后依次执行它们。所以这里还有最后一个问题:eventLoop是怎么启动起来的以及如何进行调度 ?
EventLoop的启动与调度 时机:第一次在main线程中,调用EventLoop执行task的时候,会进行初始化。最早出现在initAndRegister()
这个方法中,这个方法进行register()的时候,会将register0放到一个Runnable中,扔给eventLoop去执行,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); private void execute (Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { boolean reject = false ; try { if (removeTask(task)) { reject = true ; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } private void startThread () { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this , ST_NOT_STARTED, ST_STARTED)) { boolean success = false ; try { doStartThread(); success = true ; } finally { if (!success) { STATE_UPDATER.compareAndSet(this , ST_STARTED, ST_NOT_STARTED); } } } } } protected abstract void run () ;private void doStartThread () { assert thread == null ; executor.execute(new Runnable () { @Override public void run () { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } } }); }
下面的代码是对整个事件处理的核心:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 @Override protected void run () { int selectCnt = 0 ; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L ) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default : } } catch (IOException e) { rebuildSelector0(); selectCnt = 0 ; handleLoopException(e); continue ; } selectCnt++; cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; boolean ranTasks; if (ioRatio == 100 ) { try { if (strategy > 0 ) { processSelectedKeys(); } } finally { ranTasks = runAllTasks(); } } else if (strategy > 0 ) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0 ); } if (ranTasks || strategy > 0 ) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}." , selectCnt - 1 , selector); } selectCnt = 0 ; } else if (unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0 ; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?" , selector, e); } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return ; } } } catch (Throwable t) { handleLoopException(t); } } }