5 | Netty:服务端启动流程分析

目前对于Netty的理解是:一套完善了Java NIO操作的框架,因为Netty的最底层还是调用jdk的nio相关的API,但是又在jdk的nio基础上做了很多的封装,并衍生出来了自己相关的概念。

服务启动的主线操作

EchoServer为例,一条可参考的服务启动的主线操作如下:

main thread

  1. 创建selector
  2. 创建serversocketchannel
  3. 初始化serversocketchannel
  4. 给serversocketchannel从bossgroup中选择一个NioEventLoop

boss thread

  1. 将serversocketchannel注册到选择的NioEventLoop的selector
  2. 绑定地址启动
  3. 注册接受连接事件(OP_ACCEPT)到selector上

对应到代码中的操作依次为:

1
2
3
4
5
6
7
8
9
10
// 1. 创建selector
Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();
// 2. 创建serversocketchannel
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
// 将serversocketchannel注册到选择的NioEventLoop的selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 绑定地址启动
javaChannel().bind(localAddress, config.getBacklog());
// 注册接受连接事件(OP_ACCEPT)到selector上
selectionKey.interestOps(OP_ACCEPT);

根据上面的启动主线,以它为一个参考,我觉得这个主线过于简略,也就是说很多操作并不能体现出来。所以我将创建一个server的步骤分成如下3个大的步骤。下面会对上述3个大步骤做适当、尽可能详细的分析,并将之前看过的源码内容与服务创建联系起来。

创建EventLoopGroup

EventLoopGroup的个数决定具体reactor模式,在EchoServer中使用了两个EventLoopGroup,也就是使用了主从Reactor多线程模式;而EventLoopGroup的类型决定使用的IO模式,这里使用的是``NioEventLoopGroup也就是使用的IO模式为NIO。对应于EchoServer`中的代码为:

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

到这里有若干个问题,但是主线中不会提及,因为主线关注的是操作链条,在大致了解主线操作之后,加强对个各个细节处的理解,才能理解得更加透彻。这里的问题主要来自心中的疑问。

什么是EventLoopGroupEventLoop

之前的初略理解是:EventLoopGroup是一个线程池、EventLoop则对应一个线程。

EventLoopGroup是一个接口,有多种实现方式,在EchoServer中,它的实现是NioEventLoopGroup

ChannelEventLoopEventLoopGroup之间的关系如下图所示:

  • 一个EventLoopGroup包含一个或者多个EventLoop

  • 一个EventLoop在它的生命周期内只和一个Thread绑定;

  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理;

  • 一个Channel在它的生命周期内只注册于一个EventLoop

  • 一个EventLoop可能会被分配给一个或多个Channel

为什么new NioEventLoopGroup(1)传1

在前面的一篇文章中有分析过为什么传1,因为对于boss group来说,只会有一个channel,所以只绑定1个线程,也就只需要1个EventLoop

如果不传任何参数,线程数在NioEventLoopGroup中会先传0,在父类MultithreadEventLoopGroup先再做判断,如果为0,那么会默认为:

1
2
3
4
5
6
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

获取SelectorProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 在NioEventLoopGroup.java中
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}

// 在SelectorProvider.java中
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

其中sun.nio.ch.DefaultSelectorProvider在不同平台jdk中、其实现不一样,从而以此达到,统一不同平台下Selector的实现。其中Windows下的实现如下:

1
2
3
4
5
6
7
8
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

public static SelectorProvider create() {
return new WindowsSelectorProvider();
}
}

根据线程数创建EventExecutor数组并初始化

MultithreadEventExecutorGroup的构造函数中,会初始化一个EventExecutor数组,其实这就是NioEventLoop数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 创建EventLoopGroup失败,释放资源
}
}
}
// chooserFactory是用来创建一个选择器的工厂类。
chooser = chooserFactory.newChooser(children);

选择器是用来选择一个EventLoop,供进行事件处理。具体有两种策略,一种是EventLoop个数为2的倍数(通过&运算)、一种是普通的(通过%运算)。说真的,我对这两者的差异没有啥感觉。

对于newChild()这个方法,我们找到对应的NioEventLoopGroup的实现:

也就是说,在初始化NioEventLoopGroup的时候,就已经将所有的NioEventLoop初始化完成。

NioEventLoop的初始化

  1. 打开openSelector()操作。说明selector在初始化NioEventLoop时就被打开。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    final SelectorTuple selectorTuple = openSelector();

    private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
    unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
    throw new ChannelException("failed to open a new selector", e);
    }
    ...
    }
  2. 初始化一个包装Runnable后的executor。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    this.executor = ThreadExecutorMap.apply(executor, this);

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Executor() {
    @Override
    public void execute(final Runnable command) {
    executor.execute(apply(command, eventExecutor));
    }
    };
    }
    // 包装Runnable后,再返回一个包装后的Runnable
    public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
    @Override
    public void run() {
    setCurrentEventExecutor(eventExecutor);
    try {
    command.run();
    } finally {
    setCurrentEventExecutor(null);
    }
    }
    };
    }

通过Bootstrap设置相关参数

绑定端口并启动

这一步骤有多个重要的过程。

创建NioServerSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 指定channel类型为NioServerSocketChannel
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 设置一个生成channel的工厂类
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
// 后续channel由channleFactory产生
this.channelFactory = channelFactory;
return self();
}

此处的channelFactory是一个ReflectiveChannelFactory,它的实现比较简单,通过class获取到构造器,然后通过构造器即可创建出新的对象。简单的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(...);
}
}

@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException(...);
}
}

初始化完channelFactory之后,进入doBind()阶段,此时会通过一个channelFactory来创建一个新的对象,即NioServerSocketChannel。如下:

1
2
3
4
5
6
7
8
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// ...
}
// ...
}

NioServerSocketChannel创建的时候,会创建一个ServerSocketChannel,调用栈依次为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// SelectorProvider.provider()可参见前文中的内容
SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
// ...
}
}

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 添加一个默认的pipeline,类型为DefaultChannelPipeline
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

初始化刚创建的NioServerSocketChannel

1
2
3
4
5
6
7
8
9
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
// ...
}
// ...
}

init()中,主要是给新创建的NioServerSocketChannel设置参数,然后在它的pipeline上面添加一个ChannelHandler,用来处理新建立的连接,也就是ServerBootstrapAcceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
void init(Channel channel) {
// ...
ChannelPipeline p = channel.pipeline();
// ...
// 此处的执行p.addLast时,因为channel还未注册到eventloop上,
// 所以会将这个ChannelInitializer保存到ctx中,
// 并将ctx封装成一个PendingHandlerAddedTask,添加到pipeline中,
// 等待注册到eventloop后触发
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

其中,p.addLast()执行的时候,如果当前channel,即NioServerSocketChannel,没有注册到一个EventLoop,那么将会以PendingHandlerCallback的形式,保存到pipeline的pengdingHandlerCallbackHead这个链表上。

其中PendingHandlerCallback可以简单理解成:只是一个保存有AbstractChannelHandlerContextRunnable,并且具有单向链表结构。它的设计目的,就是等待某个时候被执行。

注册channel到eventloop

ChannelFuture regFuture = config().group().register(channel);开始,一路可以追踪到AbstractChannelregister方法里面。刚开始有一段状态判断的代码,这段代码可以印证前面的一个说法:一个channel只能注册到一个eventloop

接着在eventLoop的线程中执行register0()方法,也就是说main线程开始返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 紧接上面的代码
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
// ...
}

也就是说,initAndRegister()返回之后会拿到一个ChannelFuture,

1
2
3
4
5
6
7
8
9
10
11
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
// ...
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}

如果在eventLoop中执行register0()完毕,那么将继续执行doBind0(),在main线程中;如果没执行完毕,会等register0()在eventLoop中执行完毕之后,再执行doBind0(),此时的线程是eventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// ...
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//...
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}

至此,EchoServerChannelFuture f = b.bind(PORT).sync()中的b.bind(PORT)在main线程中的操作已经完成。所以当前步骤又分成了两个,即:1. register0;2. doBind0。

1. register0

此函数关键操作的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// AbstractChannel.java
private void register0(ChannelPromise promise) {
try {
// ...
// 一个由子类继承的方法,这里是AbstractNioChannel的实现,即:
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
doRegister();
neverRegistered = false;
// channel变成已注册到EventLoop,若该channel再注册到其他EventLoop会失败。
registered = true;

// 执行之前未绑定EventLoop时,添加的PendingHandlerCallback
// 详细可以参考下面的执行PendingHandlerCallback的流程
pipeline.invokeHandlerAddedIfNeeded();

// 此方法在设置了promise的执行状态后,再通过观察者模式,
// 通知所有的ChannelFutureListener,也就是会接着执行doBind0()。
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
执行PendingHandlerCallback的流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DefaultChannelPipeline.java
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 已将Channel注册到EventLoop,现在开始执行handlerAdded这个回调。
callHandlerAddedForAllHandlers();
}
}

// DefaultChannelPipeline.java
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// 设置已注册的标志位
registered = true;
// 获取之前的pendingHandlerCallbackHead, 并将其置null
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// 方便GC
this.pendingHandlerCallbackHead = null;
}

// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 依次遍历执行所有的pendingHandlerCallback
while (task != null) {
task.execute();
task = task.next;
}
}

PendingHandlerAddedTask的结构在前面已有描述,这里补充一下它的代码实现。其实task.execute();最终执行的代码是callHandlerAdded0(ctx)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// DefaultChannelPipeline.java
private abstract static class PendingHandlerCallback implements Runnable {
// 保存ctx
final AbstractChannelHandlerContext ctx;
// 单向链表
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute();
}

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
// ...
@Override
public void run() {
callHandlerAdded0(ctx);
}

@Override
void execute() {
EventExecutor executor = ctx.executor();
// 此时执行的线程是eventLoop
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
// ...
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}

执行完ChanneInitializer之后,将其从pipeline中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// DefaultChannelPipeline.java
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
}
// ...
}
}

// AbstractChannelHandlerContext.java
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
// 上述注释存疑,可待后续分析
if (setAddComplete()) {
// handler在此处即ChannelInitialzer
handler().handlerAdded(this);
}
}

// ChannelInitialzer.java
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {

// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用initChannel()
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// ...
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 移除ChannelInitializer
pipeline.remove(this);
}
}
return true;
}
return false;
}

protected abstract void initChannel(C ch) throws Exception;

再来看一眼当时的initChannel是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ServerBootStrap.java
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

此时的pipeline.addLast(new ServerBootstrapAcceptor...)中的ServerBootstrapAcceptor,是不是也会像ChannleInitializer一样被删除呢?

从直觉上来说,是不会的,因为如果删掉了谁来处理新建立的连接,那么它为什么没有被删掉呢?因为ServerBootstrapAcceptor并没有重写handlerAdded()这个方法,使用的父类默认实现,即啥都不干。它与ChannelInitializer同是继承自ChannelInboundHandlerAdapter,但是ChannelInitializer重写了handlerAdded(), 并在这个函数中,在初始化之后,将自身从pipeline上移除掉了。因为它只需要执行一次就即可。

safeSetSuccess(promise)

使用了观察者模式,将执行之前注册到ChannelFuture上面的ChannelFutureListener,也就是会接着执行doBind0()。这部分代码逻辑比较明了,看关键的代码片段即可:

经典的观察者模式,套路满满。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 多个Listener的时候,循环里面调用notifyListener0()方法
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
// 单个Listener的时候执行
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
//...
}
}

执行此ChannelFutureListener,也就是进入doBind0()阶段,这个操作内容不多,只是将bind的具体操作放到了Runnable中,然后扔到eventLoop的taskQueue中,等待下次eventLoop执行该task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// AbstractBootstrap.java
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// ...
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});

private static void doBind0(...) {
// This method is invoked before channelRegistered() is triggered.
// Give user handlers a chance to set up the pipeline
// in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
fireChannelRegistered

关于消息的传递,可以放到后续的连接建立后的消息发送模块。这里先提出我的几个问题:

  • 事件如何在pipeline中传递?
  • 什么是executionMask?这个实现看起来很有趣,通过executionMask与对应的事件编码相与,得出此Handler是否可以处理此消息。详细可后续进一步分析。

至此register0的操作完成。

2. doBind0

框架代码的流程:从pipeline的tail出发,触发bind事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractBootstrap.java
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

// AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

// DefaultChannelPipeline.java
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

此处的tail是LoggingHandler,也就是说会打印出bind日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// AbstractChannelHandlerContext.java
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// 从tail,即DefaultChannelPipeline#TailContext开始,往前传递bind事件。
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// tail的前驱是LoggingHandler
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 执行LogginHandler的bind方法。
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

// LogginHandler.java
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
// 继续触发bind事件,必须传递,否则会出问题。
ctx.bind(localAddress, promise);
}

从这里开始,又回到这个代码串的bind()方法处,直到执行invokeBind()时,才跳转到LoggingHandler的前驱,一个DefaultChannelPipeline#HeadContext,它才是bind操作的主要流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// DefaultChannelPipeline#HeadContext
@Override
public void bind(ChannelHandlerContext, SocketAddress, ChannelPromise) {
unsafe.bind(localAddress, promise);
}

// AbstractChannel.java
protected abstract void doBind(SocketAddress localAddress) throws Exception;
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
try {
// 实际bind操作,调用jdk的api
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
// 只是将此Runnable添加到taskQueue中,等待下次触发执行
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
//
safeSetSuccess(promise);
}

// NioServerSocketChannel.java
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

调用栈:

至此bind操作完成。

3. fireChannelActive

fireChannelRegistered的调用类似,也是从pipeline的head开始触发,只是事件的名称换了,并且多了一个设置OP_ACCEPT的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// DefaultChannelPipeline.java
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 传递给pipeline中head后面的handler
ctx.fireChannelActive();
// 设置OP_ACCEPT事件
readIfIsAutoRead();
}
//...
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

// AbstractChannel.java
@Override
public Channel read() {
pipeline.read();
return this;
}

// DefaultChannelPipeline.java
@Override
public final ChannelPipeline read() {
// tail即TailContext
tail.read();
return this;
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext read() {
// next此时为HeadContext也就是head
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) { // 执行线程为eventloop
next.invokeRead();
} else {
// ...
}

return this;
}

private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}

// DefaultChannelPipeline.java$HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

// AbstractChannel.java$AbstractUnsafe
@Override
public final void beginRead() {
assertEventLoop();

if (!isActive()) {
return;
}

try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

// AbstractNioMessageChannel.java
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}

// AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
// 将channel注册到selector上时,interestOps设置为0
// readInterestOp为16,所以此时interestOps & readInterestOp肯定会为0
if ((interestOps & readInterestOp) == 0) {
// 设置成监听OP_ACCEPT事件,为1 << 4,即16
selectionKey.interestOps(interestOps | readInterestOp);
}
}

调用栈为:

通过eventLoop来执行task的方式,其实是通过一个taskQueue来接收这些Runnable,然后eventLoop再通过统一调度,获取taskQueue里面的task,然后依次执行它们。所以这里还有最后一个问题:eventLoop是怎么启动起来的以及如何进行调度

EventLoop的启动与调度

时机:第一次在main线程中,调用EventLoop执行task的时候,会进行初始化。最早出现在initAndRegister()这个方法中,这个方法进行register()的时候,会将register0放到一个Runnable中,扔给eventLoop去执行,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// AbstractChannel.java
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

// SingleThreadEventExecutor.java
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// ...
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

protected abstract void run();

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 此时的线程为EventLoop
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 对EchoServer这个例子来说this为NioEventLoop
SingleThreadEventExecutor.this.run();
success = true;
}
// ...
}
});
}

下面的代码是对整个事件处理的核心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// NioEventLoop.java
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

5 | Netty:服务端启动流程分析

https://eucham.me/2020/02/28/001684b38465.html

作者

遇寻

发布于

2020-02-28

更新于

2021-02-09

许可协议

评论