当服务端启动好了之后,也就是说,服务端已经在执行NioEventLoop
的一个死循环方法run()
中,一直轮询事件,并且此时的监听的事件为OP_ACCEPT
。如果有新连接接入,那么首先会在上述的run()
方法中触发…
收到新的连接 首先,服务端启动好了之后,会进入等待事件的状态,也就是调用JDK的NIO的API:
1 2 3 4 5 6 if (!hasTasks()) { strategy = select(curDeadlineNanos); } selector.select();
收到新的连接后,将会通过processSelectedKeys()
进行处理,处理内容包括:创建、初始化NioSocketChannel
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized () { for (int i = 0 ; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null ; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } } } private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } }
创建、初始化NioSocketChannel 上面的unsafe的具体实现是在一个叫做NioMessageUnsafe
的内部类中,在它的read方法中:
①创建了NioSocketChannel
。②通过pipeline中的Handler,即ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public void read () { assert eventLoop () .inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } }
创建 创建NioSocketChannel的主要流程,就是先通过调用JDK的API获取SocketChannel,然后再将其作为一个值传给NioSocketChannel。因此从另一个方面来看,可以理解成NioSocketChannel是SocketChannel的封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null ) { buf.add(new NioSocketChannel (this , ch)); return 1 ; } } return 0 ; }
初始化 主要依靠pipeline中的相应事件传递。比如说,将channel注册到EventLoop中这个事件,就是靠pipeline中的Handler,ServerBootstrapAcceptor来完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 pipeline.fireChannelRead(readBuf.get(i)); @Override public final ChannelPipeline fireChannelRead (Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this ; } static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable () { @Override public void run () { next.invokeChannelRead(m); } }); } } private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
到这里,事件的往后续handler传递,都是调用上面的这个两个方法,来执行后续handler的相应read方法。此时pipeline中的handler有:
1 2 3 4 5 6 7 1 ) DefaultChannelPipeline$HeadContext2 ) io.netty.handler.logging.LoggingHandler3 ) io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor4 ) DefaultChannelPipeline$TailContext
其中,head对read只是做简单的传递:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); } @Override public ChannelHandlerContext fireChannelRead (final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this ; }
对LoggingHandler
而言,简易打印日志,并往后传递事件:
1 2 3 4 5 6 7 8 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (logger.isEnabled(internalLevel)) { logger.log(internalLevel, format(ctx, "READ" , msg)); } ctx.fireChannelRead(msg); }
到ServerBootstrapAcceptor
的read方法时,初始化便真正地开始了(此时的线程为bossGroup中的EventLoop):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override @SuppressWarnings("unchecked") public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
register的过程与服务端启动时的绑定类似,先选出一个EventLoop,选的时候,有两种方式,根据不同的线程数,使用不同的选择方式。然后经过辗转,来到对register0()
的执行,这个方法时主要的register操作。但是此时的线程是bossGroup中的EventLoop,而register0()
会在workGroup中的线程中执行。所以会先将task放入队列中,然后启动线程,并进入NioEventLoop的run()死循环方法,通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public ChannelFuture register (Channel channel) { return next().register(channel); } @Override public ChannelFuture register (Channel channel) { return register(new DefaultChannelPromise (channel, this )); } @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; } @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } } }
此时,线程切换到workGroup中的EventLoop。主要执行好几个操作:先调用jdk的api,注册selectionKey;再发布相应的事件;最后修改interestOps为OP_READ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void register0 (ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } }
调用jdk的api,注册selectionKey
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } } }
对channle已激活的事件传递中,会将NioSocketChannel的interestOps修改为OP_READ。下面的代码是事件在pipeline中的传递,与上面的分析内容一致,在此不多赘述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override public final ChannelPipeline fireChannelActive () { AbstractChannelHandlerContext.invokeChannelActive(head); return this ; } static void invokeChannelActive (final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable () { @Override public void run () { next.invokeChannelActive(); } }); } } private void invokeChannelActive () { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this ); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }
唯一有区别的是:pipeline中的head,即HeadContext对active事件的处理方式,多了一块对interestOps的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } }
这里会在pipeline中传递read事件,但是是从tail开始,可以直接跳到TailContext的read()方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public Channel read () { pipeline.read(); return this ; } @Override public final ChannelPipeline read () { tail.read(); return this ; } @Override public ChannelHandlerContext read () { final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null ) { next.invokeTasks = tasks = new Tasks (next); } executor.execute(tasks.invokeReadTask); } return this ; } private void invokeRead () { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).read(this ); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } } @Override public void read (ChannelHandlerContext ctx) { unsafe.beginRead(); }
此处便到了修改的interestOps的主要逻辑处:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } } @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
至此,对新建连接的处理基本完成。