6 | Netty:新连接的建立

当服务端启动好了之后,也就是说,服务端已经在执行NioEventLoop的一个死循环方法run()中,一直轮询事件,并且此时的监听的事件为OP_ACCEPT。如果有新连接接入,那么首先会在上述的run()方法中触发…

收到新的连接

首先,服务端启动好了之后,会进入等待事件的状态,也就是调用JDK的NIO的API:

1
2
3
4
5
6
// NioEventLoop.java -> run()
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
// 核心是调用jdk的api
selector.select();

收到新的连接后,将会通过processSelectedKeys()进行处理,处理内容包括:创建、初始化NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// NioEventLoop.java
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 帮助GC
selectedKeys.keys[i] = null;
// attachment是NioServerSocketChannel
// 服务端注册selectionKey时传入
final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
// 此处真正进入处理新连接事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//...
}
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// ...
try {
// 此时的readyOps为OP_ACCEPT,也就是16,即二进制10000
int readyOps = k.readyOps();
// ...

// 处理新连接的逻辑开始
// SelectionKey.OP_READ | SelectionKey.OP_ACCEPT = 10001
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 此处的unsafe与创建连接时的unsafe不是同一个实现
unsafe.read();
}
}
// ...
}

创建、初始化NioSocketChannel

上面的unsafe的具体实现是在一个叫做NioMessageUnsafe的内部类中,在它的read方法中:

①创建了NioSocketChannel。②通过pipeline中的Handler,即ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// ①创建了NioSocketChannel,并加入到readBuf这个List中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// ②在ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// ...
}
// ...
}

创建

创建NioSocketChannel的主要流程,就是先通过调用JDK的API获取SocketChannel,然后再将其作为一个值传给NioSocketChannel。因此从另一个方面来看,可以理解成NioSocketChannel是SocketChannel的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 实际调用时serverSocketChannel.accept()
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
// 创建NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
}
// ...
return 0;
}

初始化

主要依靠pipeline中的相应事件传递。比如说,将channel注册到EventLoop中这个事件,就是靠pipeline中的Handler,ServerBootstrapAcceptor来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 调用pipeline中的不可覆盖方法fireChannelRead
pipeline.fireChannelRead(readBuf.get(i));

// 与之前的流程类似,从pipeline中的头handler开始传递事件
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

// 一个静态模板方法,掉用next的invokeChannelRead()方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

// 还是一个模板方法,用于真实执行hander的read事件处理方法
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 触发handler的read事件,模板模式
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

到这里,事件的往后续handler传递,都是调用上面的这个两个方法,来执行后续handler的相应read方法。此时pipeline中的handler有:

1
2
3
4
5
6
7
1) DefaultChannelPipeline$HeadContext

2) io.netty.handler.logging.LoggingHandler

3) io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor

4) DefaultChannelPipeline$TailContext

其中,head对read只是做简单的传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
// DefaultChannelPipeline$HeadContext
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 调用上面代码片段中的静态模板方法,实现事件传递
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}

LoggingHandler而言,简易打印日志,并往后传递事件:

1
2
3
4
5
6
7
8
// LoggingHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "READ", msg));
}
ctx.fireChannelRead(msg);
}

ServerBootstrapAcceptor的read方法时,初始化便真正地开始了(此时的线程为bossGroup中的EventLoop):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ServerBootstrapAcceptor
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将ChannelInitializer添加到pipeline中,等执行完initial方法后,会被移除
child.pipeline().addLast(childHandler);
// 设置NioSocketChannel属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 将NioSocketChannel绑定到一个workGroup中的NioEventLoop上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

register的过程与服务端启动时的绑定类似,先选出一个EventLoop,选的时候,有两种方式,根据不同的线程数,使用不同的选择方式。然后经过辗转,来到对register0()的执行,这个方法时主要的register操作。但是此时的线程是bossGroup中的EventLoop,而register0()会在workGroup中的线程中执行。所以会先将task放入队列中,然后启动线程,并进入NioEventLoop的run()死循环方法,通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

// SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

// AbstractChannel.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// 此时的线程是bossGroup中的EventLoop,
// 此处的eventLoop则为上面分配的wordGroup中的线程。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 所以会执行此方法,此时该eventLoop中的线程还未启动,会将此task放入队列中
// 然后会通过eventLoop.execute来启动线程,并进入NioEventLoop的run()方法
// 通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
// ...
}
}

此时,线程切换到workGroup中的EventLoop。主要执行好几个操作:先调用jdk的api,注册selectionKey;再发布相应的事件;最后修改interestOps为OP_READ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void register0(ChannelPromise promise) {
try {
// ...
boolean firstRegistration = neverRegistered;
// 调用jdk的api,注册selectionKey
doRegister();
neverRegistered = false;
registered = true;

// 处理ChannelInitializer,并移除掉它
pipeline.invokeHandlerAddedIfNeeded();
// 在服务端启动的时候,会以观察者模式调用操作完成的Listener
// doBind操作就是这样被封装到了其中,但处理客户端连接没有doBind操作
safeSetSuccess(promise);
// 从pipeline的head开始传递registered事件
pipeline.fireChannelRegistered();
// 此时已经被激活
if (isActive()) {
// 第一次进行register操作,被视为建立连接
if (firstRegistration) {
// 与服务端类似,会在此处将监听的事件改为OP_READ
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 真的有数据来了
beginRead();
}
}
}
// ...
}

调用jdk的api,注册selectionKey

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractNioChannel.java
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 监听的事件为0,attachment是channel自己
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
// ...
}
}

对channle已激活的事件传递中,会将NioSocketChannel的interestOps修改为OP_READ。下面的代码是事件在pipeline中的传递,与上面的分析内容一致,在此不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DefaultChannelPipeline.java
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

// AbstractChannelHandlerContext.java
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}

private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}

唯一有区别的是:pipeline中的head,即HeadContext对active事件的处理方式,多了一块对interestOps的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
// DefaultChannelPipeline.java $ HeadContext
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 修改interestOps
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

这里会在pipeline中传递read事件,但是是从tail开始,可以直接跳到TailContext的read()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// AbstractChannel.java
@Override
public Channel read() {
pipeline.read();
return this;
}

// DefaultChannelPipeline.java
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}

return this;
}

private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}

// DefaultChannelPipeline.java
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

此处便到了修改的interestOps的主要逻辑处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// AbstractChannel.java
@Override
public final void beginRead() {
assertEventLoop();

if (!isActive()) {
return;
}

try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

// AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
// 此时interestOps为0,所以if中的条件一定会成立
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

至此,对新建连接的处理基本完成。

作者

遇寻

发布于

2020-03-11

更新于

2021-02-09

许可协议

评论