Netty源码学习系列④接收消息

有点开始怀疑人生。为什么我说不清楚netty的工作方式?博客基本上是自己一个字一个字敲出来的,也能在一定程度上说明,我当时确实是懂了,但为什么会说不出来呢?回顾了自己的博客,有些过程的细节确实忘了,但是可怕的是,我需要想半天才能想起来,有些还想不起来。我觉得方式有问题,单纯的文字记录,缺少指导性的图画,不利于理解整个流程。

接收客户端的消息,很明显是从Main Reactor所在的EventLoop的for循环中,通过select()获取到了OP_READ事件。


Update - 2020.3.23

Netty整个系列先暂停学习,我觉得目前应该学习的是Spring的一些更加深入的知识,不然有一种眼高手低的感觉,踏踏实实地把web的那一套先搞清楚。

6 | Netty:新连接的建立

当服务端启动好了之后,也就是说,服务端已经在执行NioEventLoop的一个死循环方法run()中,一直轮询事件,并且此时的监听的事件为OP_ACCEPT。如果有新连接接入,那么首先会在上述的run()方法中触发…

收到新的连接

首先,服务端启动好了之后,会进入等待事件的状态,也就是调用JDK的NIO的API:

1
2
3
4
5
6
// NioEventLoop.java -> run()
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
// 核心是调用jdk的api
selector.select();

收到新的连接后,将会通过processSelectedKeys()进行处理,处理内容包括:创建、初始化NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// NioEventLoop.java
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 帮助GC
selectedKeys.keys[i] = null;
// attachment是NioServerSocketChannel
// 服务端注册selectionKey时传入
final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
// 此处真正进入处理新连接事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//...
}
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// ...
try {
// 此时的readyOps为OP_ACCEPT,也就是16,即二进制10000
int readyOps = k.readyOps();
// ...

// 处理新连接的逻辑开始
// SelectionKey.OP_READ | SelectionKey.OP_ACCEPT = 10001
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 此处的unsafe与创建连接时的unsafe不是同一个实现
unsafe.read();
}
}
// ...
}

创建、初始化NioSocketChannel

上面的unsafe的具体实现是在一个叫做NioMessageUnsafe的内部类中,在它的read方法中:

①创建了NioSocketChannel。②通过pipeline中的Handler,即ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// ①创建了NioSocketChannel,并加入到readBuf这个List中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// ②在ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// ...
}
// ...
}

创建

创建NioSocketChannel的主要流程,就是先通过调用JDK的API获取SocketChannel,然后再将其作为一个值传给NioSocketChannel。因此从另一个方面来看,可以理解成NioSocketChannel是SocketChannel的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 实际调用时serverSocketChannel.accept()
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
// 创建NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
}
// ...
return 0;
}

初始化

主要依靠pipeline中的相应事件传递。比如说,将channel注册到EventLoop中这个事件,就是靠pipeline中的Handler,ServerBootstrapAcceptor来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 调用pipeline中的不可覆盖方法fireChannelRead
pipeline.fireChannelRead(readBuf.get(i));

// 与之前的流程类似,从pipeline中的头handler开始传递事件
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

// 一个静态模板方法,掉用next的invokeChannelRead()方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

// 还是一个模板方法,用于真实执行hander的read事件处理方法
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 触发handler的read事件,模板模式
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

到这里,事件的往后续handler传递,都是调用上面的这个两个方法,来执行后续handler的相应read方法。此时pipeline中的handler有:

1
2
3
4
5
6
7
1) DefaultChannelPipeline$HeadContext

2) io.netty.handler.logging.LoggingHandler

3) io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor

4) DefaultChannelPipeline$TailContext

其中,head对read只是做简单的传递:

1
2
3
4
5
6
7
8
9
10
11
12
13
// DefaultChannelPipeline$HeadContext
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 调用上面代码片段中的静态模板方法,实现事件传递
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}

LoggingHandler而言,简易打印日志,并往后传递事件:

1
2
3
4
5
6
7
8
// LoggingHandler.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "READ", msg));
}
ctx.fireChannelRead(msg);
}

ServerBootstrapAcceptor的read方法时,初始化便真正地开始了(此时的线程为bossGroup中的EventLoop):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// ServerBootstrapAcceptor
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将ChannelInitializer添加到pipeline中,等执行完initial方法后,会被移除
child.pipeline().addLast(childHandler);
// 设置NioSocketChannel属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 将NioSocketChannel绑定到一个workGroup中的NioEventLoop上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

register的过程与服务端启动时的绑定类似,先选出一个EventLoop,选的时候,有两种方式,根据不同的线程数,使用不同的选择方式。然后经过辗转,来到对register0()的执行,这个方法时主要的register操作。但是此时的线程是bossGroup中的EventLoop,而register0()会在workGroup中的线程中执行。所以会先将task放入队列中,然后启动线程,并进入NioEventLoop的run()死循环方法,通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

// SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

// AbstractChannel.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
AbstractChannel.this.eventLoop = eventLoop;
// 此时的线程是bossGroup中的EventLoop,
// 此处的eventLoop则为上面分配的wordGroup中的线程。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 所以会执行此方法,此时该eventLoop中的线程还未启动,会将此task放入队列中
// 然后会通过eventLoop.execute来启动线程,并进入NioEventLoop的run()方法
// 通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
// ...
}
}

此时,线程切换到workGroup中的EventLoop。主要执行好几个操作:先调用jdk的api,注册selectionKey;再发布相应的事件;最后修改interestOps为OP_READ。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void register0(ChannelPromise promise) {
try {
// ...
boolean firstRegistration = neverRegistered;
// 调用jdk的api,注册selectionKey
doRegister();
neverRegistered = false;
registered = true;

// 处理ChannelInitializer,并移除掉它
pipeline.invokeHandlerAddedIfNeeded();
// 在服务端启动的时候,会以观察者模式调用操作完成的Listener
// doBind操作就是这样被封装到了其中,但处理客户端连接没有doBind操作
safeSetSuccess(promise);
// 从pipeline的head开始传递registered事件
pipeline.fireChannelRegistered();
// 此时已经被激活
if (isActive()) {
// 第一次进行register操作,被视为建立连接
if (firstRegistration) {
// 与服务端类似,会在此处将监听的事件改为OP_READ
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 真的有数据来了
beginRead();
}
}
}
// ...
}

调用jdk的api,注册selectionKey

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractNioChannel.java
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 监听的事件为0,attachment是channel自己
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
// ...
}
}

对channle已激活的事件传递中,会将NioSocketChannel的interestOps修改为OP_READ。下面的代码是事件在pipeline中的传递,与上面的分析内容一致,在此不多赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DefaultChannelPipeline.java
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

// AbstractChannelHandlerContext.java
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}

private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}

唯一有区别的是:pipeline中的head,即HeadContext对active事件的处理方式,多了一块对interestOps的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
// DefaultChannelPipeline.java $ HeadContext
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 修改interestOps
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

这里会在pipeline中传递read事件,但是是从tail开始,可以直接跳到TailContext的read()方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// AbstractChannel.java
@Override
public Channel read() {
pipeline.read();
return this;
}

// DefaultChannelPipeline.java
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}

return this;
}

private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}

// DefaultChannelPipeline.java
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

此处便到了修改的interestOps的主要逻辑处:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// AbstractChannel.java
@Override
public final void beginRead() {
assertEventLoop();

if (!isActive()) {
return;
}

try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

// AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
// 此时interestOps为0,所以if中的条件一定会成立
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

至此,对新建连接的处理基本完成。

5 | Netty:服务端启动流程分析

目前对于Netty的理解是:一套完善了Java NIO操作的框架,因为Netty的最底层还是调用jdk的nio相关的API,但是又在jdk的nio基础上做了很多的封装,并衍生出来了自己相关的概念。

服务启动的主线操作

EchoServer为例,一条可参考的服务启动的主线操作如下:

main thread

  1. 创建selector
  2. 创建serversocketchannel
  3. 初始化serversocketchannel
  4. 给serversocketchannel从bossgroup中选择一个NioEventLoop

boss thread

  1. 将serversocketchannel注册到选择的NioEventLoop的selector
  2. 绑定地址启动
  3. 注册接受连接事件(OP_ACCEPT)到selector上

对应到代码中的操作依次为:

1
2
3
4
5
6
7
8
9
10
// 1. 创建selector
Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();
// 2. 创建serversocketchannel
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
// 将serversocketchannel注册到选择的NioEventLoop的selector
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 绑定地址启动
javaChannel().bind(localAddress, config.getBacklog());
// 注册接受连接事件(OP_ACCEPT)到selector上
selectionKey.interestOps(OP_ACCEPT);

根据上面的启动主线,以它为一个参考,我觉得这个主线过于简略,也就是说很多操作并不能体现出来。所以我将创建一个server的步骤分成如下3个大的步骤。下面会对上述3个大步骤做适当、尽可能详细的分析,并将之前看过的源码内容与服务创建联系起来。

创建EventLoopGroup

EventLoopGroup的个数决定具体reactor模式,在EchoServer中使用了两个EventLoopGroup,也就是使用了主从Reactor多线程模式;而EventLoopGroup的类型决定使用的IO模式,这里使用的是``NioEventLoopGroup也就是使用的IO模式为NIO。对应于EchoServer`中的代码为:

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

到这里有若干个问题,但是主线中不会提及,因为主线关注的是操作链条,在大致了解主线操作之后,加强对个各个细节处的理解,才能理解得更加透彻。这里的问题主要来自心中的疑问。

什么是EventLoopGroupEventLoop

之前的初略理解是:EventLoopGroup是一个线程池、EventLoop则对应一个线程。

EventLoopGroup是一个接口,有多种实现方式,在EchoServer中,它的实现是NioEventLoopGroup

ChannelEventLoopEventLoopGroup之间的关系如下图所示:

  • 一个EventLoopGroup包含一个或者多个EventLoop

  • 一个EventLoop在它的生命周期内只和一个Thread绑定;

  • 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理;

  • 一个Channel在它的生命周期内只注册于一个EventLoop

  • 一个EventLoop可能会被分配给一个或多个Channel

为什么new NioEventLoopGroup(1)传1

在前面的一篇文章中有分析过为什么传1,因为对于boss group来说,只会有一个channel,所以只绑定1个线程,也就只需要1个EventLoop

如果不传任何参数,线程数在NioEventLoopGroup中会先传0,在父类MultithreadEventLoopGroup先再做判断,如果为0,那么会默认为:

1
2
3
4
5
6
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

获取SelectorProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 在NioEventLoopGroup.java中
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}

// 在SelectorProvider.java中
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}

其中sun.nio.ch.DefaultSelectorProvider在不同平台jdk中、其实现不一样,从而以此达到,统一不同平台下Selector的实现。其中Windows下的实现如下:

1
2
3
4
5
6
7
8
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

public static SelectorProvider create() {
return new WindowsSelectorProvider();
}
}

根据线程数创建EventExecutor数组并初始化

MultithreadEventExecutorGroup的构造函数中,会初始化一个EventExecutor数组,其实这就是NioEventLoop数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
// 创建EventLoopGroup失败,释放资源
}
}
}
// chooserFactory是用来创建一个选择器的工厂类。
chooser = chooserFactory.newChooser(children);

选择器是用来选择一个EventLoop,供进行事件处理。具体有两种策略,一种是EventLoop个数为2的倍数(通过&运算)、一种是普通的(通过%运算)。说真的,我对这两者的差异没有啥感觉。

对于newChild()这个方法,我们找到对应的NioEventLoopGroup的实现:

也就是说,在初始化NioEventLoopGroup的时候,就已经将所有的NioEventLoop初始化完成。

NioEventLoop的初始化

  1. 打开openSelector()操作。说明selector在初始化NioEventLoop时就被打开。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    final SelectorTuple selectorTuple = openSelector();

    private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
    unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
    throw new ChannelException("failed to open a new selector", e);
    }
    ...
    }
  2. 初始化一个包装Runnable后的executor。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    this.executor = ThreadExecutorMap.apply(executor, this);

    public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Executor() {
    @Override
    public void execute(final Runnable command) {
    executor.execute(apply(command, eventExecutor));
    }
    };
    }
    // 包装Runnable后,再返回一个包装后的Runnable
    public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
    @Override
    public void run() {
    setCurrentEventExecutor(eventExecutor);
    try {
    command.run();
    } finally {
    setCurrentEventExecutor(null);
    }
    }
    };
    }

通过Bootstrap设置相关参数

绑定端口并启动

这一步骤有多个重要的过程。

创建NioServerSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 指定channel类型为NioServerSocketChannel
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 设置一个生成channel的工厂类
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
// 后续channel由channleFactory产生
this.channelFactory = channelFactory;
return self();
}

此处的channelFactory是一个ReflectiveChannelFactory,它的实现比较简单,通过class获取到构造器,然后通过构造器即可创建出新的对象。简单的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(...);
}
}

@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException(...);
}
}

初始化完channelFactory之后,进入doBind()阶段,此时会通过一个channelFactory来创建一个新的对象,即NioServerSocketChannel。如下:

1
2
3
4
5
6
7
8
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// ...
}
// ...
}

NioServerSocketChannel创建的时候,会创建一个ServerSocketChannel,调用栈依次为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// SelectorProvider.provider()可参见前文中的内容
SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
// ...
}
}

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 添加一个默认的pipeline,类型为DefaultChannelPipeline
pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

初始化刚创建的NioServerSocketChannel

1
2
3
4
5
6
7
8
9
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
// ...
}
// ...
}

init()中,主要是给新创建的NioServerSocketChannel设置参数,然后在它的pipeline上面添加一个ChannelHandler,用来处理新建立的连接,也就是ServerBootstrapAcceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
void init(Channel channel) {
// ...
ChannelPipeline p = channel.pipeline();
// ...
// 此处的执行p.addLast时,因为channel还未注册到eventloop上,
// 所以会将这个ChannelInitializer保存到ctx中,
// 并将ctx封装成一个PendingHandlerAddedTask,添加到pipeline中,
// 等待注册到eventloop后触发
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

其中,p.addLast()执行的时候,如果当前channel,即NioServerSocketChannel,没有注册到一个EventLoop,那么将会以PendingHandlerCallback的形式,保存到pipeline的pengdingHandlerCallbackHead这个链表上。

其中PendingHandlerCallback可以简单理解成:只是一个保存有AbstractChannelHandlerContextRunnable,并且具有单向链表结构。它的设计目的,就是等待某个时候被执行。

注册channel到eventloop

ChannelFuture regFuture = config().group().register(channel);开始,一路可以追踪到AbstractChannelregister方法里面。刚开始有一段状态判断的代码,这段代码可以印证前面的一个说法:一个channel只能注册到一个eventloop

接着在eventLoop的线程中执行register0()方法,也就是说main线程开始返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 紧接上面的代码
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
// ...
}

也就是说,initAndRegister()返回之后会拿到一个ChannelFuture,

1
2
3
4
5
6
7
8
9
10
11
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
// ...
ChannelFuture regFuture = config().group().register(channel);
// ...
return regFuture;
}

如果在eventLoop中执行register0()完毕,那么将继续执行doBind0(),在main线程中;如果没执行完毕,会等register0()在eventLoop中执行完毕之后,再执行doBind0(),此时的线程是eventLoop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// ...
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//...
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}

至此,EchoServerChannelFuture f = b.bind(PORT).sync()中的b.bind(PORT)在main线程中的操作已经完成。所以当前步骤又分成了两个,即:1. register0;2. doBind0。

1. register0

此函数关键操作的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// AbstractChannel.java
private void register0(ChannelPromise promise) {
try {
// ...
// 一个由子类继承的方法,这里是AbstractNioChannel的实现,即:
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
doRegister();
neverRegistered = false;
// channel变成已注册到EventLoop,若该channel再注册到其他EventLoop会失败。
registered = true;

// 执行之前未绑定EventLoop时,添加的PendingHandlerCallback
// 详细可以参考下面的执行PendingHandlerCallback的流程
pipeline.invokeHandlerAddedIfNeeded();

// 此方法在设置了promise的执行状态后,再通过观察者模式,
// 通知所有的ChannelFutureListener,也就是会接着执行doBind0()。
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
执行PendingHandlerCallback的流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DefaultChannelPipeline.java
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 已将Channel注册到EventLoop,现在开始执行handlerAdded这个回调。
callHandlerAddedForAllHandlers();
}
}

// DefaultChannelPipeline.java
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// 设置已注册的标志位
registered = true;
// 获取之前的pendingHandlerCallbackHead, 并将其置null
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// 方便GC
this.pendingHandlerCallbackHead = null;
}

// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 依次遍历执行所有的pendingHandlerCallback
while (task != null) {
task.execute();
task = task.next;
}
}

PendingHandlerAddedTask的结构在前面已有描述,这里补充一下它的代码实现。其实task.execute();最终执行的代码是callHandlerAdded0(ctx)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// DefaultChannelPipeline.java
private abstract static class PendingHandlerCallback implements Runnable {
// 保存ctx
final AbstractChannelHandlerContext ctx;
// 单向链表
PendingHandlerCallback next;
PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
this.ctx = ctx;
}
abstract void execute();
}

private final class PendingHandlerAddedTask extends PendingHandlerCallback {
// ...
@Override
public void run() {
callHandlerAdded0(ctx);
}

@Override
void execute() {
EventExecutor executor = ctx.executor();
// 此时执行的线程是eventLoop
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
// ...
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}

执行完ChanneInitializer之后,将其从pipeline中移除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// DefaultChannelPipeline.java
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
}
// ...
}
}

// AbstractChannelHandlerContext.java
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
// 上述注释存疑,可待后续分析
if (setAddComplete()) {
// handler在此处即ChannelInitialzer
handler().handlerAdded(this);
}
}

// ChannelInitialzer.java
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {

// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用initChannel()
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// ...
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
// 移除ChannelInitializer
pipeline.remove(this);
}
}
return true;
}
return false;
}

protected abstract void initChannel(C ch) throws Exception;

再来看一眼当时的initChannel是如何实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ServerBootStrap.java
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

此时的pipeline.addLast(new ServerBootstrapAcceptor...)中的ServerBootstrapAcceptor,是不是也会像ChannleInitializer一样被删除呢?

从直觉上来说,是不会的,因为如果删掉了谁来处理新建立的连接,那么它为什么没有被删掉呢?因为ServerBootstrapAcceptor并没有重写handlerAdded()这个方法,使用的父类默认实现,即啥都不干。它与ChannelInitializer同是继承自ChannelInboundHandlerAdapter,但是ChannelInitializer重写了handlerAdded(), 并在这个函数中,在初始化之后,将自身从pipeline上移除掉了。因为它只需要执行一次就即可。

safeSetSuccess(promise)

使用了观察者模式,将执行之前注册到ChannelFuture上面的ChannelFutureListener,也就是会接着执行doBind0()。这部分代码逻辑比较明了,看关键的代码片段即可:

经典的观察者模式,套路满满。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 多个Listener的时候,循环里面调用notifyListener0()方法
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
// 单个Listener的时候执行
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
//...
}
}

执行此ChannelFutureListener,也就是进入doBind0()阶段,这个操作内容不多,只是将bind的具体操作放到了Runnable中,然后扔到eventLoop的taskQueue中,等待下次eventLoop执行该task。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// AbstractBootstrap.java
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// ...
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});

private static void doBind0(...) {
// This method is invoked before channelRegistered() is triggered.
// Give user handlers a chance to set up the pipeline
// in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
fireChannelRegistered

关于消息的传递,可以放到后续的连接建立后的消息发送模块。这里先提出我的几个问题:

  • 事件如何在pipeline中传递?
  • 什么是executionMask?这个实现看起来很有趣,通过executionMask与对应的事件编码相与,得出此Handler是否可以处理此消息。详细可后续进一步分析。

至此register0的操作完成。

2. doBind0

框架代码的流程:从pipeline的tail出发,触发bind事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractBootstrap.java
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

// AbstractChannel.java
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

// DefaultChannelPipeline.java
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

此处的tail是LoggingHandler,也就是说会打印出bind日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// AbstractChannelHandlerContext.java
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// 从tail,即DefaultChannelPipeline#TailContext开始,往前传递bind事件。
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// tail的前驱是LoggingHandler
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
// 执行LogginHandler的bind方法。
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

// LogginHandler.java
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
// 继续触发bind事件,必须传递,否则会出问题。
ctx.bind(localAddress, promise);
}

从这里开始,又回到这个代码串的bind()方法处,直到执行invokeBind()时,才跳转到LoggingHandler的前驱,一个DefaultChannelPipeline#HeadContext,它才是bind操作的主要流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// DefaultChannelPipeline#HeadContext
@Override
public void bind(ChannelHandlerContext, SocketAddress, ChannelPromise) {
unsafe.bind(localAddress, promise);
}

// AbstractChannel.java
protected abstract void doBind(SocketAddress localAddress) throws Exception;
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
try {
// 实际bind操作,调用jdk的api
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
// 只是将此Runnable添加到taskQueue中,等待下次触发执行
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
//
safeSetSuccess(promise);
}

// NioServerSocketChannel.java
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

调用栈:

至此bind操作完成。

3. fireChannelActive

fireChannelRegistered的调用类似,也是从pipeline的head开始触发,只是事件的名称换了,并且多了一个设置OP_ACCEPT的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// DefaultChannelPipeline.java
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 传递给pipeline中head后面的handler
ctx.fireChannelActive();
// 设置OP_ACCEPT事件
readIfIsAutoRead();
}
//...
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

// AbstractChannel.java
@Override
public Channel read() {
pipeline.read();
return this;
}

// DefaultChannelPipeline.java
@Override
public final ChannelPipeline read() {
// tail即TailContext
tail.read();
return this;
}

// AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext read() {
// next此时为HeadContext也就是head
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) { // 执行线程为eventloop
next.invokeRead();
} else {
// ...
}

return this;
}

private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}

// DefaultChannelPipeline.java$HeadContext
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

// AbstractChannel.java$AbstractUnsafe
@Override
public final void beginRead() {
assertEventLoop();

if (!isActive()) {
return;
}

try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

// AbstractNioMessageChannel.java
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}

// AbstractNioChannel.java
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
// 将channel注册到selector上时,interestOps设置为0
// readInterestOp为16,所以此时interestOps & readInterestOp肯定会为0
if ((interestOps & readInterestOp) == 0) {
// 设置成监听OP_ACCEPT事件,为1 << 4,即16
selectionKey.interestOps(interestOps | readInterestOp);
}
}

调用栈为:

通过eventLoop来执行task的方式,其实是通过一个taskQueue来接收这些Runnable,然后eventLoop再通过统一调度,获取taskQueue里面的task,然后依次执行它们。所以这里还有最后一个问题:eventLoop是怎么启动起来的以及如何进行调度

EventLoop的启动与调度

时机:第一次在main线程中,调用EventLoop执行task的时候,会进行初始化。最早出现在initAndRegister()这个方法中,这个方法进行register()的时候,会将register0放到一个Runnable中,扔给eventLoop去执行,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// AbstractChannel.java
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});

// SingleThreadEventExecutor.java
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// ...
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

protected abstract void run();

private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 此时的线程为EventLoop
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 对EchoServer这个例子来说this为NioEventLoop
SingleThreadEventExecutor.this.run();
success = true;
}
// ...
}
});
}

下面的代码是对整个事件处理的核心:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// NioEventLoop.java
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}

selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}

if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

4 | Netty:JavaNIO概览

在正式开始Netty相关的学习之前,我决定还是要先回顾一下Java NIO,至少要对Java NIO相关的概念有一个了解,如Channel、ByteBuffer、Selector等。要自己动手写一写相关的demo实例、并且要尽可能地去了解其后面是如何实现的,也就是稍微看看相关jdk的源代码。

Java NIO 由以下几个核心部分组成:Buffer, Channel, Selector。传统的IO操作面向数据流,面向流 的 I/O 系统一次一个字节地处理数据,意味着每次从流中读一个或多个字节,直至完成,数据没有被缓存在任何地方;NIO操作面向缓冲区( 面向块),数据从Channel读取到Buffer缓冲区,随后在Buffer中处理数据。

Buffer

可以理解成煤矿里面挖煤的小车,把煤从井底运地面上面。它的属性与子类如下:

Buffer是一个抽象类,继承自Object,拥有多个子类。此类在JDK源码中的注释如下:

A container for data of a specific primitive type.

A buffer is a linear, finite sequence of elements of a specific primitive type. Aside from its content, the essential properties of a buffer are its capacity, limit, and position:

  • A buffer’s capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes.

  • A buffer’s limit is the index of the first element that should not be read or written. A buffer’s limit is never negative and is never greater than its capacity.

    写模式下,limit表示最多能往Buffer里写多少数据,等于capacity值;读模式下,limit表示最多可以读取多少数据,小于等于 capacity 值。

  • A buffer’s position is the index of the next element to be read or written. A buffer’s position is never negative and is never greater than its limit.

There is one subclass of this class for each non-boolean primitive type.

Transferring data

Each subclass of this class defines two categories of get and put operations:

  • Relative operations read or write one or more elements starting at the current position and then increment the position by the number of elements transferred. If the requested transfer exceeds the limit then a relative get operation throws a BufferUnderflowException and a relative put operation throws a BufferOverflowException; in either case, no data is transferred.

  • Absolute operations take an explicit element index and do not affect the position. Absolute get and put operations throw an IndexOutOfBoundsException if the index argument exceeds the limit.

Data may also, of course, be transferred in to or out of a buffer by the I/O operations of an appropriate channel, which are always relative to the current position.

Marking and resetting

A buffer’s mark is the index to which its position will be reset when the reset method is invoked. The mark is not always defined, but when it is defined it is never negative and is never greater than the position. If the mark is defined then it is discarded when the position or the limit is adjusted to a value smaller than the mark. If the mark is not defined then invoking the reset method causes an InvalidMarkException to be thrown.

Invariants

The following invariant holds for the mark, position, limit, and capacity values:

0 <= mark <= position <= limit <= capacity

A newly-created buffer always has a position of zero and a mark that is undefined. The initial limit may be zero, or it may be some other value that depends upon the type of the buffer and the manner in which it is constructed. Each element of a newly-allocated buffer is initialized to zero.

Clearing, flipping, and rewinding

In addition to methods for accessing the position, limit, and capacity values and for marking and resetting, this class also defines the following operations upon buffers:

  • clear() makes a buffer ready for a new sequence of channel-read or relative put operations: It sets the limit to the capacity and the position to zero.

  • flip() makes a buffer ready for a new sequence of channel-write or relative get operations: It sets the limit to the current position and then sets the position to zero.

  • rewind() makes a buffer ready for re-reading the data that it already contains: It leaves the limit unchanged and sets the position to zero.

Read-only buffers

Every buffer is readable, but not every buffer is writable. The mutation methods of each buffer class are specified as optional operations that will throw a ReadOnlyBufferException when invoked upon a read-only buffer. A read-only buffer does not allow its content to be changed, but its mark, position, and limit values are mutable. Whether or not a buffer is read-only may be determined by invoking its isReadOnly method.

Thread safety

Buffers are not safe for use by multiple concurrent threads. If a buffer is to be used by more than one thread then access to the buffer should be controlled by appropriate synchronization.

Invocation chaining

Methods in this class that do not otherwise have a value to return are specified to return the buffer upon which they are invoked. This allows method invocations to be chained; for example, the sequence of statements

1
2
3
b.flip();
b.position(23);
b.limit(42);

can be replaced by the single, more compact statement
b.flip().position(23).limit(42);

clear()方法

1
2
3
4
5
6
7
// 清除Buffer中的信息,只将参数恢复成默认
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

flip()方法

1
2
3
4
5
6
7
// 将limit记录成当前的位置,指针指向头部,为读取做准备
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

rewind()方法

1
2
3
4
5
6
// 指针指向头部,可以用于再次读取
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

如何使用Java NIO读取文件内容

遇到坑了,但是感觉可以透过这个问题更加深入理解Java NIO的这些概念。出现问题的代码:

1
2
3
4
5
6
7
8
9
10
public static void fileChannel() throws IOException {
FileInputStream fis = new FileInputStream("/Users/yangyu/Documents/data.json");
FileChannel fileChannel = fis.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while ((fileChannel.read(byteBuffer)) != -1) {
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
}
}

上面的代码读取不到数据,一直在做循环,但是不输出数据。为什么?因为hasRemaining()是以positionlimit作对比,如下:

1
2
3
public final boolean hasRemaining() {
return position < limit;
}

当从fileChannel中读取数据到byteBuffer中之后,limitcapacity相等(初始化既如此),此时的position也与capacity相同,导致hasRemaining()false,无法向控制台输出。

所以需要将position设置成从0开始,让读取从0开始,直到读到之前的容量,所以使用flip()来完成这个目的,即:

此时却发现控制台无限打印东西,为了弄明白这是为什么,我把byteBuffer的大小调成了8,跑起来之后的输出如下:

这是为什么呢?这个问题应该与byteBuffer里面的那几个参数有关系:

猜测应该是与fileChannel.read(byteBuffer)中的具体实现有关。粗略看了看fileChannel.read(byteBuffer)的实现,大致流程如下:

  1. 计算byteBuffer的剩余量,即limit - position。对于上面的情况,剩余量为0。

  2. 找出缓存buffer,此时缓存buffer为上次read得到的,第一次为空会直接分配;第二次read的时候,其3大属性全部为8,也即上次读取的结果。

  3. 将缓存的buffer进行rewind()flip(剩余量),得到一个[pos=0, limit=0, capacity=8]的buffer。

  4. 进行读取的时候回根据缓存buffer的pos、limit来确定能读取的数量,也即:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 其中var1为缓存buffer
    int var5 = var1.position();
    int var6 = var1.limit();

    assert var5 <= var6;
    // var6 - var5 = limit - position = 0
    int var7 = var5 <= var6 ? var6 - var5 : 0;
    // var7 = 0
    if (var7 == 0) {
    // 0 即读取的字节数
    return 0;
    }
  5. 如果能读到数据,会将缓存buffer里面的的内容再转移到byteBuffer(也就是我们read()里面传的ByteBuffer)中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // var5即缓存buffer,读取内容到var5中
    int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
    // 准备用来读取
    var5.flip();
    // var1是我们传入的byteBuffer,如果读取到的字节数大于0,
    if (var6 > 0) {
    // 将var5中的内容拷贝到var1中
    var1.put(var5);
    }

直到发现flip()的注释里面有这样一段注释:

Compacts this buffer (optional operation).
The bytes between the buffer’s current position and its limit, if any, are copied to the beginning of the buffer. That is, the byte at index p = position() is copied to index zero, the byte at index p + 1 is copied to index one, and so forth until the byte at index limit() - 1 is copied to index n = limit() - 1 - p. The buffer’s position is then set to n+1 and its limit is set to its capacity. The mark, if defined, is discarded.

The buffer’s position is set to the number of bytes copied, rather than to zero, so that an invocation of this method can be followed immediately by an invocation of another relative put method.

Invoke this method after writing data from a buffer in case the write was incomplete. The following loop, for example, copies bytes from one channel to another via the buffer buf:

1
2
3
4
5
6
buf.clear();          // Prepare buffer for use
while (in.read(buf) >= 0 || buf.position != 0) {
buf.flip();
out.write(buf);
buf.compact(); // In case of partial write
}

加上这段代码buf.compact()便可以正常读取文件内容。到这里就有点心累了,为什么写个读取都这么多坑。感觉有问题的时候往这三个参数上面想就行了。

Channel

煤矿厂里面运煤的通道,需要看的子类总共有4个,分别为:

  • FileChannel:文件通道,用于文件的读和写。不支持非阻塞
  • DatagramChannel:用于 UDP 连接的接收和发送
  • SocketChannel:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
  • ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求

Selector

只有自己写过的代码才会有更深刻的印象,哪怕是从别的地方抄来的,自己慢慢debug一下,找出自己对代码的疑问,然后再去搞清楚这些问题,我觉得这样让我对它的了解会更深。这两段代码基本和网上的教程类似,大部分是抄的,但是自己有一定的加工,也遇到了1个问题,外加一个疑问。

客户端代码

客户端的代码很简单:①读标准输入。②发送给Server端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Client端的代码很像八股文,这样弄就行了。
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress("127.0.0.1", 8086));

Scanner scanner = new Scanner(System.in);
if (sc.finishConnect()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (scanner.hasNextLine()) {
// 读标准输入
String info = scanner.nextLine();
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer);
// 发送给Server端
sc.write(buffer);
}
}
}
}

服务端代码

主要参考了一篇CSDN上的博客一篇简书上的博客,简书上面的这边对我的帮助很大,十分感谢。我的问题主要有两点,第一个是少了it.remove();,第二个是关于如何触发SelectionKey.OP_WRITE事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 八股文的感觉。
public static void startServer() throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(8086));

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 一定要remove掉,不然上次的事件会累积。
// 也就是对同一事件会处理两次,这样可能会导致报错。
it.remove();
if (key.isAcceptable()) {
System.out.println("ACCEPT");
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
} else if (key.isReadable()) {
System.out.print("READ:");
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
long bytesRead = sc.read(buf);
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
System.out.print((char)buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
} else if (key.isWritable()) {
// OP_WRITE事件如何触发?
System.out.print("WRITE:");
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
} else if (key.isConnectable()) {
System.out.println("CONNECT");
} else {
System.out.println("UNKNOWN");
}
}
}
}
  1. 如果缺少it.remove()方法的调用,那么会导致事件会堆积在Selector的Set<SelectionKey> publicSelectedKeys中,引发对同一事件会处理两次,这样可能会导致报错。
  2. 如何触发SelectionKey.OP_WRITE?因为我看到大部分关于selector的博客,都没有写如何触发该事件,并且也未对读事件做出说明。

首先肯定要在调用ssChannel.accept()之后,将得到的SocketChannel多注册一个OP_WRITE事件。即修改成:

1
2
3
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE,ByteBuffer.allocateDirect(BUF_SIZE));

然后会发现程序卡死,屏幕一直输出Write。为什么会有这么多OP_WRITE事件?因为Java NIO的事件触发是水平触发,即只要满足条件,就触发一个事件,所以只要内核缓冲区还不满,就一直发出OP_WRITE事件

与水平触发对应的还有一个叫做边缘触发,即每当状态变化时,触发一个事件。对之前的Netty的事件是边缘触发又有了一个认识。

3 | Netty:源代码导入IDEA

Netty源代码导入IDEA时需要注意的地方

操作系统

64位

版本问题

  • 官网上面说可以用64-bit OpenJDK 8 or above 。没有尝试OpenJDK,Oracle的JDK要1.8版本的。源码里面用到了Unsafe这个类,在jdk1.8之后的版本中被移除掉了。

  • IDEA的位数保持与操作系统位数相同

操作流程

  1. 最好先设置好maven的镜像,导入时需要拉取很多jar包。
  2. 打开IDEA,选择Import Project,选择好netty源码目录后再选择maven。
  3. 等待Import完成,找到EchoServer,跑main方法,这时会报错,按照如下方式操作即可。

如果用的不是jdk1.8以上的jdk,会报Unsafe找不到,这种情况只需要在Project Structure中将Project SDK设置成jdk1.8即可。

如果是io.netty.util.collection.LongObjectMap找不到之类的错误,可以在netty-common模块中执行mvn clean compile,可以按下图方式进行操作该指令。

  1. 操作完成

参考:
https://netty.io/wiki/setting-up-development-environment.html

2 | Netty:Reactor与Netty之间的关系

今天是2020年2月2号,感觉是一个比较特殊的日子,今天就来一篇记录型的博客吧,哈哈

起初很好奇,到底什么叫Reactor模式,这个名词感觉特别高大上,然后看描述,虽然能看懂描述,但是却不是特别明白到底是什么意思。这个时候主要是没有形成一种直观的印象,直观的印象就是比如说苹果,再给你看个实物,你就能把苹果与关联起来。在学习netty的时候,也遇到了Reactor模式,于是有了机会来形成一种比较直观的印象。

什么是Reactor模式?

定义看起来很抽象,但是其实很好理解。它是一种开发模式,模式的核心流程:注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出相应的处理。仅此而已。使用BIO开发的时候,每有一个新的请求过来了,都会新开一个线程,然后在新的线程里面进行业务处理,这种处理方式就是Thread-Per-Connection;

所以对应起来,使用NIO开发的时候,也有一个模式去处理相应的请求与业务逻辑,叫做Reactor模式。至于具体怎么做,也就是前面提到的Reactor模式的核心流程。

Reactor模式的3种版本

开始这个之前我有一个疑问:Thread-Per-Connection与Reactor单线程有什么关系?

Thread-Per-Connection模式

示意图:

伪代码:

Reactor单线程模式

从这张图里面看不懂其执行流是什么样的。待后续理解了再补上解读。

Reactor多线程模式

主从Reactor多线程模式

对服务器开发来说,很重要的事情是接收连接,accept事件会被单独注册到另外一个reactor中。

在Netty中如何实现Reactor三种模式

其中单线程和非主从reactor多线程模式的差别只在于new的时候传入的线程数量,不传的话,会默认以CPU的核数为依据来确定最终的线程数。

Netty 如何支持主从 Reactor 模式

以netty项目源代码(分支4.1)中netty-example模块的EchoServer为例。

保存

它是一个主从reactor多线程模式,其中bossGroup负责accept事件,workerGroup负责逻辑处理。

在①中,分别将两个EventLoopGroup传入到ServerBootstrap中,并将这两个EventLoopGroup保存起来。

步骤②执行的保存逻辑如下:

步骤③即已保存完毕。保存起来之后,什么时候使用呢?

将channel注册到parentGroup

先看parentGroup的使用过程,找到使用了group这个变量的地方Ctrl + B

进去之后,是一个类似于普通getter方法

只有一个地方调用,名称也叫group(),所以还可以继续往上看调用者

然后使用Ctrl + Alt + H查看该group()方法的调用者:

initAndRegister()中可以找到将channel(即ServerSocketChannel)注册到该EventLoopGroup的代码,如下:

绑定完毕。

channel注册到childGroup

找到使用了childGroup这个变量的地方Ctrl + B

只有个地方使用到了该childGroup,并改名成了currentChildGroup

①改名,②将childGroup作为一个变量,传入ServerBootstrapAcceptor中。ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,其覆盖了父类的channelRead()方法,其中将新进来的channel注册到childGroup中。

也就是说,新进来的连接,即SocketChannel,都会被注册到childGroup中。

新连接建立进行哪些初始化

回到上面的init()方法中提出的,它是何时被调用的这个问题中。

它是AbstractBootstrap抽象类中的一个抽象方法,有两个类继承自AbstractBootstrap,分别是BootstrapServerBootstrap。调用init()方法的地方只有一个,即initAndRegister()中。

其中传入init()channelServerSocketChannel,其大致过程:

当服务端即EchoServer启动的时候,会为ServerSocketChannel的pipeline添加一个ServerBootstrapAcceptor,所以每当有来自客户端的请求时,都会首先经过ServerBootstrapAcceptor,让它先处理,而它的处理内容就是将SocketChannel注册到childGroup中。

为什么说 Netty 的 main reactor 大多并不能用到一个线程组,只能线程组里面的一个?

因为服务端只会启动一次,只有在启动过程中去绑定端口号时才会将ServerSocketChannel绑定到main reactor上。所以这时候要从initAndRegister的调用者逐级往上查看,如下;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
...
}

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// 设置新接入连接的SocketChannel注册到sub reactor
init(channel);
} catch (Throwable t) {
...
}
// 注册ServerSocketChannel到main reactor
ChannelFuture regFuture = config().group().register(channel);
...
}

所以一个ServerSocketChannel只会注册到一个group中。但还是个疑问,是与EventLoopGroup相关的,留待后续再来回答。这个问题的意思是说,只能用线程组里面的一个线程,为什么?为什么不能多个线程?下面这个问题可以回答这个疑问!

Netty 给 Channel 分配 NIO event loop 的规则是什么

initAndRegister()中的config().group().register(channel)代码出发,也就是ServerSocketChannel注册到main reactor中的那段代码(参见上面)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 从register方法进入
ChannelFuture regFuture = config().group().register(channel);

// 进入EventLoopGroup.java,它是一个接口。
// NIO选MultithreadEventLoopGroup的实现
ChannelFuture register(Channel channel);

@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

@Override
public EventLoop next() {
return (EventLoop) super.next();
}

// 进入父类的next()实现中
@Override
public EventExecutor next() {
return chooser.next();
}

// 进入chooser的next()方法,发现这个chooser是一个接口类型
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
public interface EventExecutorChooserFactory {
EventExecutorChooser newChooser(EventExecutor[] executors);
@UnstableApi interface EventExecutorChooser {
EventExecutor next();
}
}

// chooser的初始化是根据传入的线程数决定的
// 在MultithreadEventExecutorGroup的构造函数中
children = new EventExecutor[nThreads];
// 需要多少个线程,就有多少个EventExecutor,它初步与Thread等价
chooser = chooserFactory.newChooser(children);

所以chooser.next()返回的是一个等价于Thread的对象,也就是说这个ServerSocketChannel只会在这个Thread中进行接收。其中的chooser就是根据线程数的个数,来选取一个线程分配给register进来的ServerSocketChannel。具体分配策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// next()是一个抽象方法,它的具体实现有两种
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

// 判断是否为2的幂的简便方法
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}

至此,上面的那个疑问算是有了一个答案。

1 | netty:粘包与拆包的处理

TCP是个流协议,流是一串没有界限的数据。TCP会根据TCP缓冲区的实际情况对包进行划分。因此造成一个完整的业务包,会被TCP分成多个包、把多个包封装成一个大的包进行发送。

粘包与拆包现象

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

  2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;

  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;

  4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

产生原因

  1. 应用程序write写入的字节大小/大于套接口发送缓冲区大小;

  2. 进行MSS大小的TCP分段;

  3. 以太网帧的payload大于MTU进行IP分片。

对于Linux,发送缓冲区的默认值为:16384。可使用下面命令查看:

1
2
3
4
5
6
7
8
9
10
# 接收
cat /proc/sys/net/ipv4/tcp_rmem
# min default max
# 4096 87380 6291456 (单位:byte)
# 4K 85K 6M
# 发送(单位:byte)
cat /proc/sys/net/ipv4/tcp_wmem
# min default max
# 4096 16384 4194304 (单位:byte)
# 4K 16K 4M

数据来自百度云的云服务器

对于MacOS,可参考:sysctl net.inet.tcp,但是好像没找到与linux类似的参数。

如何解决

Netty如何解决

Netty中主要是在收到数据后,对数据进行处理解码处理时,根据不同的策略,进行了拆包操作,然后将得到的完整的业务数据包传递给下个处理逻辑。分割前后的逻辑主要在ByteToMessageDecoder这个类中。它的继承如下:

每次从TCP缓冲区读到数据都会调用其channelRead()方法。这个函数的处理逻辑是:

  1. 用累加器cumulator将新读入的数据(ByteBuf)存储到cumulation中;
  2. 调用解码器

累加器

存在两个累加器,MERGE_CUMULATORCOMPOSITE_CUMULATOR。默认的是前者,即:private Cumulator cumulator = MERGE_CUMULATOR;

MERGE_CUMULATOR会先判断是否需要扩容,然后再将收到的msg拷贝到cumulation中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
return cumulation.writeBytes(in);
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
};

扩容的过程是先得到一个能够容纳下原数据+当前数据的收集器,然后将原数据和当前数据依次拷贝进入收集器,最后释放旧的收集器里面的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(
oldCumulation.readableBytes() + in.readableBytes(), MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);
newCumulation.writeBytes(in);
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.release();
}
}

COMPOSITE_CUMULATOR是将每个新收到的消息,作为一个Component存储到收集器CompositeByteBuf中的components数组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}.
*/
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
// user use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
return expandCumulation(alloc, cumulation, in);
}
final CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(MAX_VALUE);
composite.addComponent(true, cumulation);
}
composite.addComponent(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
in.release();
}
}
}
};

拆包解码流程

callDecode()方法中的decodeRemovalReentryProtection()将调用decode()方法,其中decode()是一个抽象方法,由子类去实现。主要的子类有:

FixedLengthFrameDecoder

里面有一个属性叫frameLength,用来表示消息的长度。

1
2
3
4
5
6
7
8
9
A decoder that splits the received ByteBufs by the fixed number of bytes. For example, if you received the following four fragmented packets:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+

A FixedLengthFrameDecoder(3) will decode them into the following three packets with the fixed length:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

流程也比较简单,收集器里面的数据长度够frameLength,就从收集器中截取frameLengthbyte,然后返回一个新的ByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}

/**
* Create a frame out of the {@link ByteBuf} and return it.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could
* be created.
*/
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;// 长度不够,此次decode不产生消息
} else {
return in.readRetainedSlice(frameLength);
}
}

有一个问题,如果一次收到的数据长度为2 * frameLength,且这个数据是最后一个数据,那么是否存在解码出现异常的情况?

  1. 有一个循环

  2. 输入结束的时候再次调用解码

LineBasedFrameDecoder

流程是先找到当前消息中的换行符,存在且没有超过最大长度,返回解释到的数据。

DelimiterBasedFrameDecoder

根据特定的字符进行分割,其中如果分割符是行标志,会调用LineBasedFrameDecoder进行分割解码。

1
2
3
4
5
6
7
8
9
10
11
// decode()方法中
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}

// lineBasedDecoder不为空的情况是分割字符是行分割字符
// 构造方法中
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
}

判断分割符是否为行分割符的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}

因为分割字符可能是多个,当数据中存在多个分割字符的情况下,会用分割后得到的数据最短的那个分割字符。如下:

1
2
3
4
5
6
7
8
9
10
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}

For example, if you have the following data in the buffer:
+————–+
| ABC\nDEF\r\n |
+————–+

a DelimiterBasedFrameDecoder(Delimiters.lineDelimiter()) will choose ‘\n’ as the first delimiter and produce two frames:
+—–+—–+
| ABC | DEF |
+—–+—–+

rather than incorrectly choosing ‘\r\n’ as the first delimiter:
+———-+
| ABC\nDEF |
+———-+

LengthFieldBasedFrameDecoder

简而言之,就是在数据的头部,放一个专门的长度位,根据长度位来读取后面信息的内容。

这个类比较有意思,注释差不多占了2/5。主要的处理逻辑是decode(),但是这个方法100行都不到。注释主要解释了这个类里面几个参数的不同配置,产生不同的处理情况。

情况对应于下表:

lengthFieldOffset lengthFieldLength lengthAdjustment initialBytesToStrip
0x01 0 2 0 0
0x02 0 2 0 2
0x03 0 2 -2 0
0x04 2 3 0 0
0x05 0 3 2 0
0x06 1 2 1 3
0x07 1 2 -3 3
0x01

lengthFieldLength = 2表示长度位占头部的2 bytes,剩下的都是消息占位,也就是0x000C(12) + 2 = 14

0x02

0x01类似,只是多了initialBytesToStrip = 2,解码后的内容截取掉了头部的initialBytesToStrip位。也就是解码后的长度为14 - initialBytesToStrip = 12

0x03

这种情况下,长度位的值,表示整个包的长度,包括长度位本身的长度。lengthAdjustment = -2表示要将长度位的值加上lengthAdjustment,作为消息的长度。

0x04

0x01相比,多了个一个长度位的偏移量lengthFieldOffset。所以长度位的前面又可以放一些其他数据。也就是说,真正的消息是从lengthFieldOffset + lengthFieldLength后开始。

0x05

0x03对比,只是lengthAdjustment的正负不同,也就意味着真实的消息是在长度位后面是有偏移的,而偏移出来的空间,可以用作存放另外一种数据类型。

0x06

0x040x05的基础上,长度位多了偏移lengthFieldOffset,真实的消息的偏移又多加了一个lengthAdjustment,然后截掉了头部开始的initialBytesToStripbytes。

0x07

0x06的基础上,lengthAdjustment变成负数了,与0x03的情况类似。

整体代码的流程
除去异常处理的情况,就是计算整个消息的长度,然后跳过要求跳过的字节数,再从ByteBuf中读取消息。如下:

参考:

《Netty权威指南》

netty源码分析之拆包器的奥秘

两个Netty入门用法Demo

大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。

EchoServer

Netty官方Echo例子,其实在源码中也有该例子。

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

private int port;

public EchoServer(int port) {
this.port = port;
}

public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new EchoServer(8888).run();
System.out.println("运行完毕");
}
}

EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.Arrays;
import java.util.List;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = ((ByteBuf) msg);
System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EchoClient {

private static int port = 8888;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect("localhost", port).sync();
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}

EchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8));
System.out.println("通道已打开");
}

// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf m = (ByteBuf) msg; // (1)
// try {
// System.out.println("Client received : " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
// } finally {
// m.release();
// }
// }

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

TimeServer

这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。

TimeServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TimeServer {

public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException var9) {
var9.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}
new TimeServer().bind(port);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

private int counter;

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

TimeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {

public void connect(String host, int port) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()});
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}

new TimeClient().connect("localhost", port);
}
}

TimeClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

private int counter;

private byte[] req;

public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(req.length);
msg.writeBytes(req);
ctx.writeAndFlush(msg);
}
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : " + body + "; the counter is : " + ++ counter);
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}

这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoderStringDecoder不加入到pipeline中即可。

参考:
https://netty.io/wiki/index.html
https://netty.io/wiki/user-guide-for-4.x.html

基于Netty实现局域网内自动组网

这种功能的实现首先考虑到的是广/多播,然后通过所受到的广播,获取到发送某种广播的ip地址,即实现“发现设备”功能。得到IP,即完成组网功能。多播与广播在这里选择的是多播。选项|单播|多播(组播)|广播:|:|:|:描述|主机之间一对一的通讯模式,网络中的交换机和路由器对数据只进行转发不进行复制。|主
阅读更多