项目需要,对 Dubbo 进行了一次功能调研,主要集中在服务治理中的标签路由。这部分的内容不难,但是能够对 Dubbo 的实现有一定的了解。
需要 ZooKeeper、Java Application Based on Dubbo。
通过 CODING CD,在腾讯云的弹性伸缩组上(有兴趣可到 CODING CD 中详细了解),部署的一个实例,需要先安装 java 依赖,如下:
1 | java-1.8.0-openjdk-devel.x86_64 |
其主要的配置是一个脚本,如下:
1 | wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz |
在安全组中确保端口 2181 开放。此时:
A 模块在调用 B 模块前,设置了一个 TAG_KEY,表示选择带有 TAG_KEY 的 B 服务:
1 |
|
应用的部署还是采用 CODING CD 部署在腾讯云的弹性伸缩组上。主要的配置如下:
A 模块启动:
1 | java -jar -Ddubbo.registry.address=zookeeper://172.21.0.34:2181 /root/a.jar 2>&1 & |
B 模块启动:
1 | java -jar -Ddubbo.registry.address=zookeeper://172.21.0.34:2181 /root/b.jar 2>&1 & |
dubbo-admin 使用的 github 中 develop 的最新版,按照相应的提示进行构建即可。
启动 dubbo-admin 的前端,此时的目录为 dubbo-admin/dubbo-admin-ui
1 | npm run dev |
启动 dubbo-admin 的后端,此时的目录为 dubbo-admin/dubbo-admin-server
修改 dubbo-admin/dubbo-admin-server/src/main/resources/application.properties
文件,改成正确的 zookeeper 地址。
1 | admin.registry.address=zookeeper://49.233.238.170:2181 |
编译并启动
1 | mvn clean package -DskipTests=true |
打开 dubbo-admin,找到标签路由,配置如下,应用名填写 moduleb:
1 | enabled: true |
TIPS
如果有遇到在 Dubbo Admin 中修改不生效的情况,考虑一下 Dubbo Admin 与 Dubbo 版本间的差异。在此踩到一个坑如下:
Dubbo Admin 管理页面是用 docker 跑起来的,它里面的源代码比较旧,路由设置到 zk 中的路径与 dubbo 读取 zk 的路径不一样。ISSUE 可见:https://github.com/apache/dubbo-admin/issues/577
请求流向:LB -> A -> B
A 中设置的 TAG_KEY 为:
1 | public String sayHello() { |
此时 B 模块服务有两个,即一个新、一个旧。
如果要只返回最新版本,可在 dubbo-admin 控制台的服务治理中,添加标签路由,新的实例在 tag1 下,旧的实例在 tag2 下,如下:
1 | enabled: true |
此时的返回如下:
如果只使用旧版本实例,可在将上面 tag1 和 tag2 下面的内容调换即可,此时返回如下:
如果要同时能够访问到新旧实例的内容,将上述配置中所有的 IP 全部填写到 tag1 下即可,此时的返回如下:
总共分三个大的方面,分别是配置的存储、配置同步、如何解析配置。
URL: /api/dev/rules/route/tag/moduleb
Request Method: PUT
在 dubbo-admin-server 中它的实现在文件 TagRoutesController.java
中,最终是将配置保存到 ZooKeeper 中,保存的路径为:
即:/dubbo/config/dubbo/moduleb.tag-router
在 ZooKeeper 中可以在相应的路径下看到相关的配置,如下:
在 TagRouter.java
中实现了 ConfigurationListener
接口,当配置变更时,会更新 tagRouterRule
。
1 |
|
源码位于 dubbo 中的 TagRouter.java
在上面的示例中,我们需要手动在 dubbo-admin 界面中,新增标签路由的配置,但经过对该配置的新增过程的简要分析,发现只是在 ZooKeeper 中新增一条记录。
有点开始怀疑人生。为什么我说不清楚netty的工作方式?博客基本上是自己一个字一个字敲出来的,也能在一定程度上说明,我当时确实是懂了,但为什么会说不出来呢?回顾了自己的博客,有些过程的细节确实忘了,但是可怕的是,我需要想半天才能想起来,有些还想不起来。我觉得方式有问题,单纯的文字记录,缺少指导性的图画,不利于理解整个流程。
接收客户端的消息,很明显是从Main Reactor所在的EventLoop的for循环中,通过select()获取到了OP_READ事件。
Update - 2020.3.23
Netty整个系列先暂停学习,我觉得目前应该学习的是Spring的一些更加深入的知识,不然有一种眼高手低的感觉,踏踏实实地把web的那一套先搞清楚。
当服务端启动好了之后,也就是说,服务端已经在执行
NioEventLoop
的一个死循环方法run()
中,一直轮询事件,并且此时的监听的事件为OP_ACCEPT
。如果有新连接接入,那么首先会在上述的run()
方法中触发…
首先,服务端启动好了之后,会进入等待事件的状态,也就是调用JDK的NIO的API:
1 | // NioEventLoop.java -> run() |
收到新的连接后,将会通过processSelectedKeys()
进行处理,处理内容包括:创建、初始化NioSocketChannel
。
1 | // NioEventLoop.java |
上面的unsafe的具体实现是在一个叫做NioMessageUnsafe
的内部类中,在它的read方法中:
①创建了NioSocketChannel
。②通过pipeline中的Handler,即ServerBootstrap$ServerBootstrapAcceptor中初始化NioSocketChannel。
1 | // |
创建NioSocketChannel的主要流程,就是先通过调用JDK的API获取SocketChannel,然后再将其作为一个值传给NioSocketChannel。因此从另一个方面来看,可以理解成NioSocketChannel是SocketChannel的封装。
1 |
|
主要依靠pipeline中的相应事件传递。比如说,将channel注册到EventLoop中这个事件,就是靠pipeline中的Handler,ServerBootstrapAcceptor来完成。
1 | // 调用pipeline中的不可覆盖方法fireChannelRead |
到这里,事件的往后续handler传递,都是调用上面的这个两个方法,来执行后续handler的相应read方法。此时pipeline中的handler有:
1 | 1) DefaultChannelPipeline$HeadContext |
其中,head对read只是做简单的传递:
1 | // DefaultChannelPipeline$HeadContext |
对LoggingHandler
而言,简易打印日志,并往后传递事件:
1 | // LoggingHandler.java |
到ServerBootstrapAcceptor
的read方法时,初始化便真正地开始了(此时的线程为bossGroup中的EventLoop):
1 | // ServerBootstrapAcceptor |
register的过程与服务端启动时的绑定类似,先选出一个EventLoop,选的时候,有两种方式,根据不同的线程数,使用不同的选择方式。然后经过辗转,来到对register0()
的执行,这个方法时主要的register操作。但是此时的线程是bossGroup中的EventLoop,而register0()
会在workGroup中的线程中执行。所以会先将task放入队列中,然后启动线程,并进入NioEventLoop的run()死循环方法,通过不断遍历是否有已监听事件以及执行队列中的任务,最终来执行该task。
1 | // MultithreadEventLoopGroup.java |
此时,线程切换到workGroup中的EventLoop。主要执行好几个操作:先调用jdk的api,注册selectionKey;再发布相应的事件;最后修改interestOps为OP_READ。
1 | private void register0(ChannelPromise promise) { |
调用jdk的api,注册selectionKey
1 | // AbstractNioChannel.java |
对channle已激活的事件传递中,会将NioSocketChannel的interestOps修改为OP_READ。下面的代码是事件在pipeline中的传递,与上面的分析内容一致,在此不多赘述。
1 | // DefaultChannelPipeline.java |
唯一有区别的是:pipeline中的head,即HeadContext对active事件的处理方式,多了一块对interestOps的处理:
1 | // DefaultChannelPipeline.java $ HeadContext |
这里会在pipeline中传递read事件,但是是从tail开始,可以直接跳到TailContext的read()方法中:
1 | // AbstractChannel.java |
此处便到了修改的interestOps的主要逻辑处:
1 | // AbstractChannel.java |
至此,对新建连接的处理基本完成。
目前对于Netty的理解是:一套完善了Java NIO操作的框架,因为Netty的最底层还是调用jdk的nio相关的API,但是又在jdk的nio基础上做了很多的封装,并衍生出来了自己相关的概念。
以EchoServer
为例,一条可参考的服务启动的主线操作如下:
main thread
boss thread
对应到代码中的操作依次为:
1 | // 1. 创建selector |
根据上面的启动主线,以它为一个参考,我觉得这个主线过于简略,也就是说很多操作并不能体现出来。所以我将创建一个server的步骤分成如下3个大的步骤。下面会对上述3个大步骤做适当、尽可能详细的分析,并将之前看过的源码内容与服务创建联系起来。
EventLoopGroup
EventLoopGroup
的个数决定具体reactor模式,在EchoServer
中使用了两个EventLoopGroup
,也就是使用了主从Reactor多线程模式;而EventLoopGroup
的类型决定使用的IO模式,这里使用的是``NioEventLoopGroup也就是使用的IO模式为NIO。对应于
EchoServer`中的代码为:
1 | EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
到这里有若干个问题,但是主线中不会提及,因为主线关注的是操作链条,在大致了解主线操作之后,加强对个各个细节处的理解,才能理解得更加透彻。这里的问题主要来自心中的疑问。
EventLoopGroup
、EventLoop
之前的初略理解是:EventLoopGroup
是一个线程池、EventLoop
则对应一个线程。
EventLoopGroup
是一个接口,有多种实现方式,在EchoServer
中,它的实现是NioEventLoopGroup
。
Channel
、EventLoop
、EventLoopGroup
之间的关系如下图所示:
一个EventLoopGroup
包含一个或者多个EventLoop
;
一个EventLoop
在它的生命周期内只和一个Thread
绑定;
所有由EventLoop
处理的I/O事件都将在它专有的Thread
上被处理;
一个Channel
在它的生命周期内只注册于一个EventLoop
;
一个EventLoop
可能会被分配给一个或多个Channel
。
new NioEventLoopGroup(1)
传1在前面的一篇文章中有分析过为什么传1,因为对于boss group来说,只会有一个channel,所以只绑定1个线程,也就只需要1个EventLoop
。
如果不传任何参数,线程数在NioEventLoopGroup
中会先传0,在父类MultithreadEventLoopGroup
先再做判断,如果为0,那么会默认为:
1 | DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( |
1 | // 在NioEventLoopGroup.java中 |
其中sun.nio.ch.DefaultSelectorProvider
在不同平台jdk中、其实现不一样,从而以此达到,统一不同平台下Selector
的实现。其中Windows下的实现如下:
1 | public class DefaultSelectorProvider { |
EventExecutor
数组并初始化在MultithreadEventExecutorGroup
的构造函数中,会初始化一个EventExecutor
数组,其实这就是NioEventLoop
数组。
1 | children = new EventExecutor[nThreads]; |
选择器是用来选择一个EventLoop
,供进行事件处理。具体有两种策略,一种是EventLoop个数为2的倍数(通过&运算)、一种是普通的(通过%运算)。说真的,我对这两者的差异没有啥感觉。
对于newChild()
这个方法,我们找到对应的NioEventLoopGroup
的实现:
也就是说,在初始化NioEventLoopGroup
的时候,就已经将所有的NioEventLoop
初始化完成。
打开openSelector()
操作。说明selector在初始化NioEventLoop
时就被打开。
1 | final SelectorTuple selectorTuple = openSelector(); |
初始化一个包装Runnable后的executor。
1 | this.executor = ThreadExecutorMap.apply(executor, this); |
Bootstrap
设置相关参数这一步骤有多个重要的过程。
NioServerSocketChannel
1 | // 指定channel类型为NioServerSocketChannel |
此处的channelFactory
是一个ReflectiveChannelFactory
,它的实现比较简单,通过class获取到构造器,然后通过构造器即可创建出新的对象。简单的逻辑如下:
1 | public ReflectiveChannelFactory(Class<? extends T> clazz) { |
初始化完channelFactory
之后,进入doBind()
阶段,此时会通过一个channelFactory
来创建一个新的对象,即NioServerSocketChannel
。如下:
1 | final ChannelFuture initAndRegister() { |
在NioServerSocketChannel
创建的时候,会创建一个ServerSocketChannel
,调用栈依次为:
1 | // SelectorProvider.provider()可参见前文中的内容 |
1 | final ChannelFuture initAndRegister() { |
在init()
中,主要是给新创建的NioServerSocketChannel
设置参数,然后在它的pipeline上面添加一个ChannelHandler
,用来处理新建立的连接,也就是ServerBootstrapAcceptor
。
1 |
|
其中,p.addLast()
执行的时候,如果当前channel,即NioServerSocketChannel
,没有注册到一个EventLoop,那么将会以PendingHandlerCallback
的形式,保存到pipeline的pengdingHandlerCallbackHead
这个链表上。
其中PendingHandlerCallback
可以简单理解成:只是一个保存有AbstractChannelHandlerContext
的Runnable
,并且具有单向链表结构。它的设计目的,就是等待某个时候被执行。
从ChannelFuture regFuture = config().group().register(channel);
开始,一路可以追踪到AbstractChannel
的register
方法里面。刚开始有一段状态判断的代码,这段代码可以印证前面的一个说法:一个channel只能注册到一个eventloop。
接着在eventLoop的线程中执行register0()
方法,也就是说main线程开始返回。
1 | // 紧接上面的代码 |
也就是说,initAndRegister()
返回之后会拿到一个ChannelFuture,
1 | final ChannelFuture initAndRegister() { |
如果在eventLoop中执行register0()
完毕,那么将继续执行doBind0()
,在main线程中;如果没执行完毕,会等register0()
在eventLoop中执行完毕之后,再执行doBind0()
,此时的线程是eventLoop。
1 | private ChannelFuture doBind(final SocketAddress localAddress) { |
至此,EchoServer
中ChannelFuture f = b.bind(PORT).sync()
中的b.bind(PORT)
在main线程中的操作已经完成。所以当前步骤又分成了两个,即:1. register0;2. doBind0。
此函数关键操作的流程如下:
1 | // AbstractChannel.java |
1 | // DefaultChannelPipeline.java |
PendingHandlerAddedTask
的结构在前面已有描述,这里补充一下它的代码实现。其实task.execute();
最终执行的代码是callHandlerAdded0(ctx)
。
1 | // DefaultChannelPipeline.java |
执行完ChanneInitializer
之后,将其从pipeline中移除。
1 | // DefaultChannelPipeline.java |
再来看一眼当时的initChannel
是如何实现的。
1 | // ServerBootStrap.java |
此时的pipeline.addLast(new ServerBootstrapAcceptor...)
中的ServerBootstrapAcceptor
,是不是也会像ChannleInitializer
一样被删除呢?
从直觉上来说,是不会的,因为如果删掉了谁来处理新建立的连接,那么它为什么没有被删掉呢?因为ServerBootstrapAcceptor
并没有重写handlerAdded()
这个方法,使用的父类默认实现,即啥都不干。它与ChannelInitializer
同是继承自ChannelInboundHandlerAdapter
,但是ChannelInitializer
重写了handlerAdded()
, 并在这个函数中,在初始化之后,将自身从pipeline上移除掉了。因为它只需要执行一次就即可。
使用了观察者模式,将执行之前注册到ChannelFuture上面的ChannelFutureListener
,也就是会接着执行doBind0()。这部分代码逻辑比较明了,看关键的代码片段即可:
经典的观察者模式,套路满满。
1 | // 多个Listener的时候,循环里面调用notifyListener0()方法 |
执行此ChannelFutureListener,也就是进入doBind0()阶段,这个操作内容不多,只是将bind的具体操作放到了Runnable中,然后扔到eventLoop的taskQueue中,等待下次eventLoop执行该task。
1 | // AbstractBootstrap.java |
关于消息的传递,可以放到后续的连接建立后的消息发送模块。这里先提出我的几个问题:
executionMask
?这个实现看起来很有趣,通过executionMask
与对应的事件编码相与,得出此Handler是否可以处理此消息。详细可后续进一步分析。至此register0的操作完成。
框架代码的流程:从pipeline的tail出发,触发bind事件。
1 | // AbstractBootstrap.java |
此处的tail是LoggingHandler
,也就是说会打印出bind日志:
1 | // AbstractChannelHandlerContext.java |
从这里开始,又回到这个代码串的bind()
方法处,直到执行invokeBind()时,才跳转到LoggingHandler
的前驱,一个DefaultChannelPipeline#HeadContext
,它才是bind操作的主要流程:
1 | // DefaultChannelPipeline#HeadContext |
调用栈:
至此bind操作完成。
与fireChannelRegistered
的调用类似,也是从pipeline的head开始触发,只是事件的名称换了,并且多了一个设置OP_ACCEPT
的操作。
1 | // DefaultChannelPipeline.java |
调用栈为:
通过eventLoop来执行task的方式,其实是通过一个taskQueue来接收这些Runnable,然后eventLoop再通过统一调度,获取taskQueue里面的task,然后依次执行它们。所以这里还有最后一个问题:eventLoop是怎么启动起来的以及如何进行调度?
时机:第一次在main线程中,调用EventLoop执行task的时候,会进行初始化。最早出现在initAndRegister()
这个方法中,这个方法进行register()的时候,会将register0放到一个Runnable中,扔给eventLoop去执行,即:
1 | // AbstractChannel.java |
下面的代码是对整个事件处理的核心:
1 | // NioEventLoop.java |
在正式开始Netty相关的学习之前,我决定还是要先回顾一下Java NIO,至少要对Java NIO相关的概念有一个了解,如Channel、ByteBuffer、Selector等。要自己动手写一写相关的demo实例、并且要尽可能地去了解其后面是如何实现的,也就是稍微看看相关jdk的源代码。
Java NIO 由以下几个核心部分组成:Buffer, Channel, Selector。传统的IO操作面向数据流,面向流 的 I/O 系统一次一个字节地处理数据,意味着每次从流中读一个或多个字节,直至完成,数据没有被缓存在任何地方;NIO操作面向缓冲区( 面向块),数据从Channel读取到Buffer缓冲区,随后在Buffer中处理数据。
可以理解成煤矿里面挖煤的小车,把煤从井底运地面上面。它的属性与子类如下:
Buffer是一个抽象类,继承自Object,拥有多个子类。此类在JDK源码中的注释如下:
A container for data of a specific primitive type.
A buffer is a linear, finite sequence of elements of a specific primitive type. Aside from its content, the essential properties of a buffer are its
capacity
,limit
, andposition
:
A buffer’s
capacity
is the number of elements it contains. The capacity of a buffer is never negative and never changes.A buffer’s
limit
is the index of the first element that should not be read or written. A buffer’s limit is never negative and is never greater than its capacity.写模式下,limit表示最多能往Buffer里写多少数据,等于capacity值;读模式下,limit表示最多可以读取多少数据,小于等于 capacity 值。
A buffer’s
position
is the index of the next element to be read or written. A buffer’s position is never negative and is never greater than its limit.There is one subclass of this class for each non-boolean primitive type.
Transferring data
Each subclass of this class defines two categories of get and put operations:
Relative operations read or write one or more elements starting at the current position and then increment the position by the number of elements transferred. If the requested transfer exceeds the limit then a relative get operation throws a
BufferUnderflowException
and a relative put operation throws aBufferOverflowException
; in either case, no data is transferred.Absolute operations take an explicit element index and do not affect the position. Absolute get and put operations throw an
IndexOutOfBoundsException
if the index argument exceeds the limit.Data may also, of course, be transferred in to or out of a buffer by the I/O operations of an appropriate channel, which are always relative to the current position.
Marking and resetting
A buffer’s
mark
is the index to which its position will be reset when the reset method is invoked. The mark is not always defined, but when it is defined it is never negative and is never greater than the position. If the mark is defined then it is discarded when the position or the limit is adjusted to a value smaller than the mark. If the mark is not defined then invoking the reset method causes anInvalidMarkException
to be thrown.Invariants
The following invariant holds for the mark, position, limit, and capacity values:
0 <= mark <= position <= limit <= capacity
A newly-created buffer always has a position of zero and a mark that is undefined. The initial limit may be zero, or it may be some other value that depends upon the type of the buffer and the manner in which it is constructed. Each element of a newly-allocated buffer is initialized to zero.
Clearing, flipping, and rewinding
In addition to methods for accessing the
position
,limit
, andcapacity
values and for marking and resetting, this class also defines the following operations upon buffers:
clear()
makes a buffer ready for a new sequence of channel-read or relative put operations: It sets the limit to the capacity and the position to zero.
flip()
makes a buffer ready for a new sequence of channel-write or relative get operations: It sets the limit to the current position and then sets the position to zero.
rewind()
makes a buffer ready for re-reading the data that it already contains: It leaves the limit unchanged and sets the position to zero.Read-only buffers
Every buffer is readable, but not every buffer is writable. The mutation methods of each buffer class are specified as optional operations that will throw a
ReadOnlyBufferException
when invoked upon a read-only buffer. A read-only buffer does not allow its content to be changed, but its mark, position, and limit values are mutable. Whether or not a buffer is read-only may be determined by invoking its isReadOnly method.Thread safety
Buffers are not safe for use by multiple concurrent threads. If a buffer is to be used by more than one thread then access to the buffer should be controlled by appropriate synchronization.
Invocation chaining
Methods in this class that do not otherwise have a value to return are specified to return the buffer upon which they are invoked. This allows method invocations to be chained; for example, the sequence of statements
1
2
3 b.flip();
b.position(23);
b.limit(42);can be replaced by the single, more compact statement
b.flip().position(23).limit(42);
clear()方法
1 | // 清除Buffer中的信息,只将参数恢复成默认 |
flip()方法
1 | // 将limit记录成当前的位置,指针指向头部,为读取做准备 |
rewind()方法
1 | // 指针指向头部,可以用于再次读取 |
遇到坑了,但是感觉可以透过这个问题更加深入理解Java NIO的这些概念。出现问题的代码:
1 | public static void fileChannel() throws IOException { |
上面的代码读取不到数据,一直在做循环,但是不输出数据。为什么?因为hasRemaining()
是以position
和limit
作对比,如下:
1 | public final boolean hasRemaining() { |
当从fileChannel
中读取数据到byteBuffer
中之后,limit
与capacity
相等(初始化既如此),此时的position
也与capacity
相同,导致hasRemaining()
为false
,无法向控制台输出。
所以需要将position设置成从0开始,让读取从0开始,直到读到之前的容量,所以使用flip()
来完成这个目的,即:
此时却发现控制台无限打印东西,为了弄明白这是为什么,我把byteBuffer
的大小调成了8,跑起来之后的输出如下:
这是为什么呢?这个问题应该与byteBuffer
里面的那几个参数有关系:
猜测应该是与fileChannel.read(byteBuffer)
中的具体实现有关。粗略看了看fileChannel.read(byteBuffer)
的实现,大致流程如下:
计算byteBuffer
的剩余量,即limit - position
。对于上面的情况,剩余量为0。
找出缓存buffer,此时缓存buffer为上次read得到的,第一次为空会直接分配;第二次read
的时候,其3大属性全部为8,也即上次读取的结果。
将缓存的buffer进行rewind()
、flip(剩余量)
,得到一个[pos=0, limit=0, capacity=8]的buffer。
进行读取的时候回根据缓存buffer的pos、limit来确定能读取的数量,也即:
1 | // 其中var1为缓存buffer |
如果能读到数据,会将缓存buffer里面的的内容再转移到byteBuffer
(也就是我们read()
里面传的ByteBuffer
)中:
1 | // var5即缓存buffer,读取内容到var5中 |
直到发现flip()
的注释里面有这样一段注释:
Compacts this buffer (optional operation).
The bytes between the buffer’s current position and its limit, if any, are copied to the beginning of the buffer. That is, the byte at indexp = position()
is copied to index zero, the byte at indexp + 1
is copied to index one, and so forth until the byte at indexlimit() - 1
is copied to indexn = limit() - 1 - p
. The buffer’s position is then set ton+1
and its limit is set to itscapacity
. Themark
, if defined, is discarded.The buffer’s
position
is set to the number of bytes copied, rather than to zero, so that an invocation of this method can be followed immediately by an invocation of another relative put method.Invoke this method after writing data from a buffer in case the write was incomplete. The following loop, for example, copies bytes from one channel to another via the buffer buf:
1
2
3
4
5
6 buf.clear(); // Prepare buffer for use
while (in.read(buf) >= 0 || buf.position != 0) {
buf.flip();
out.write(buf);
buf.compact(); // In case of partial write
}
加上这段代码buf.compact()
便可以正常读取文件内容。到这里就有点心累了,为什么写个读取都这么多坑。感觉有问题的时候往这三个参数上面想就行了。
煤矿厂里面运煤的通道,需要看的子类总共有4个,分别为:
FileChannel
:文件通道,用于文件的读和写。不支持非阻塞DatagramChannel
:用于 UDP 连接的接收和发送SocketChannel
:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端ServerSocketChannel
:TCP 对应的服务端,用于监听某个端口进来的请求只有自己写过的代码才会有更深刻的印象,哪怕是从别的地方抄来的,自己慢慢debug一下,找出自己对代码的疑问,然后再去搞清楚这些问题,我觉得这样让我对它的了解会更深。这两段代码基本和网上的教程类似,大部分是抄的,但是自己有一定的加工,也遇到了1个问题,外加一个疑问。
客户端的代码很简单:①读标准输入。②发送给Server端。
1 | // Client端的代码很像八股文,这样弄就行了。 |
主要参考了一篇CSDN上的博客和一篇简书上的博客,简书上面的这边对我的帮助很大,十分感谢。我的问题主要有两点,第一个是少了it.remove();
,第二个是关于如何触发SelectionKey.OP_WRITE
事件。
1 | // 八股文的感觉。 |
it.remove()
方法的调用,那么会导致事件会堆积在Selector的Set<SelectionKey> publicSelectedKeys
中,引发对同一事件会处理两次,这样可能会导致报错。SelectionKey.OP_WRITE
?因为我看到大部分关于selector的博客,都没有写如何触发该事件,并且也未对读事件做出说明。首先肯定要在调用ssChannel.accept()
之后,将得到的SocketChannel
多注册一个OP_WRITE
事件。即修改成:
1 | SocketChannel sc = ssChannel.accept(); |
然后会发现程序卡死,屏幕一直输出Write
。为什么会有这么多OP_WRITE
事件?因为Java NIO的事件触发是水平触发,即只要满足条件,就触发一个事件,所以只要内核缓冲区还不满,就一直发出OP_WRITE
事件。
与水平触发对应的还有一个叫做边缘触发,即每当状态变化时,触发一个事件。对之前的Netty的事件是边缘触发又有了一个认识。
Netty源代码导入IDEA时需要注意的地方
64位
官网上面说可以用64-bit OpenJDK 8 or above 。没有尝试OpenJDK,Oracle的JDK要1.8版本的。源码里面用到了Unsafe
这个类,在jdk1.8之后的版本中被移除掉了。
IDEA的位数保持与操作系统位数相同
Import Project
,选择好netty源码目录后再选择maven。EchoServer
,跑main方法,这时会报错,按照如下方式操作即可。如果用的不是jdk1.8以上的jdk,会报Unsafe找不到,这种情况只需要在Project Structure
中将Project SDK
设置成jdk1.8即可。
如果是io.netty.util.collection.LongObjectMap
找不到之类的错误,可以在netty-common
模块中执行mvn clean compile
,可以按下图方式进行操作该指令。
参考:
https://netty.io/wiki/setting-up-development-environment.html
今天是2020年2月2号,感觉是一个比较特殊的日子,今天就来一篇记录型的博客吧,哈哈
起初很好奇,到底什么叫Reactor模式,这个名词感觉特别高大上,然后看描述,虽然能看懂描述,但是却不是特别明白到底是什么意思。这个时候主要是没有形成一种直观的印象,直观的印象就是比如说苹果,再给你看个实物,你就能把苹果与关联起来。在学习netty的时候,也遇到了Reactor模式,于是有了机会来形成一种比较直观的印象。
定义看起来很抽象,但是其实很好理解。它是一种开发模式,模式的核心流程:注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出相应的处理。仅此而已。使用BIO开发的时候,每有一个新的请求过来了,都会新开一个线程,然后在新的线程里面进行业务处理,这种处理方式就是Thread-Per-Connection;
所以对应起来,使用NIO开发的时候,也有一个模式去处理相应的请求与业务逻辑,叫做Reactor模式。至于具体怎么做,也就是前面提到的Reactor模式的核心流程。
开始这个之前我有一个疑问:Thread-Per-Connection与Reactor单线程有什么关系?
示意图:
伪代码:
从这张图里面看不懂其执行流是什么样的。待后续理解了再补上解读。
对服务器开发来说,很重要的事情是接收连接,accept事件会被单独注册到另外一个reactor中。
其中单线程和非主从reactor多线程模式的差别只在于new的时候传入的线程数量,不传的话,会默认以CPU的核数为依据来确定最终的线程数。
以netty项目源代码(分支4.1)中netty-example模块的EchoServer
为例。
它是一个主从reactor多线程模式,其中bossGroup
负责accept事件,workerGroup
负责逻辑处理。
在①中,分别将两个EventLoopGroup
传入到ServerBootstrap
中,并将这两个EventLoopGroup
保存起来。
步骤②执行的保存逻辑如下:
步骤③即已保存完毕。保存起来之后,什么时候使用呢?
parentGroup
先看parentGroup
的使用过程,找到使用了group
这个变量的地方Ctrl + B
:
进去之后,是一个类似于普通getter方法
只有一个地方调用,名称也叫group()
,所以还可以继续往上看调用者
然后使用Ctrl + Alt + H
查看该group()
方法的调用者:
在initAndRegister()
中可以找到将channel
(即ServerSocketChannel
)注册到该EventLoopGroup
的代码,如下:
绑定完毕。
channel
注册到childGroup
找到使用了childGroup
这个变量的地方Ctrl + B
:
只有个地方使用到了该childGroup
,并改名成了currentChildGroup
。
①改名,②将childGroup
作为一个变量,传入ServerBootstrapAcceptor
中。ServerBootstrapAcceptor
继承自ChannelInboundHandlerAdapter
,其覆盖了父类的channelRead()
方法,其中将新进来的channel
注册到childGroup
中。
也就是说,新进来的连接,即SocketChannel
,都会被注册到childGroup
中。
回到上面的init()
方法中提出的,它是何时被调用的这个问题中。
它是AbstractBootstrap
抽象类中的一个抽象方法,有两个类继承自AbstractBootstrap
,分别是Bootstrap
和ServerBootstrap
。调用init()
方法的地方只有一个,即initAndRegister()
中。
其中传入init()
的channel
为ServerSocketChannel
,其大致过程:
当服务端即EchoServer
启动的时候,会为ServerSocketChannel
的pipeline添加一个ServerBootstrapAcceptor
,所以每当有来自客户端的请求时,都会首先经过ServerBootstrapAcceptor
,让它先处理,而它的处理内容就是将SocketChannel
注册到childGroup
中。
因为服务端只会启动一次,只有在启动过程中去绑定端口号时才会将ServerSocketChannel
绑定到main reactor
上。所以这时候要从initAndRegister
的调用者逐级往上查看,如下;
1 | public ChannelFuture bind(InetAddress inetHost, int inetPort) { |
所以一个ServerSocketChannel
只会注册到一个group中。但还是个疑问,是与EventLoopGroup相关的,留待后续再来回答。这个问题的意思是说,只能用线程组里面的一个线程,为什么?为什么不能多个线程?下面这个问题可以回答这个疑问!
从initAndRegister()
中的config().group().register(channel)
代码出发,也就是ServerSocketChannel
注册到main reactor中的那段代码(参见上面)。
1 | // 从register方法进入 |
所以chooser.next()
返回的是一个等价于Thread的对象,也就是说这个ServerSocketChannel
只会在这个Thread中进行接收。其中的chooser就是根据线程数的个数,来选取一个线程分配给register进来的ServerSocketChannel
。具体分配策略:
1 | // next()是一个抽象方法,它的具体实现有两种 |
至此,上面的那个疑问算是有了一个答案。
TCP是个流协议,流是一串没有界限的数据。TCP会根据TCP缓冲区的实际情况对包进行划分。因此造成一个完整的业务包,会被TCP分成多个包、把多个包封装成一个大的包进行发送。
服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。
应用程序write写入的字节大小/大于套接口发送缓冲区大小;
进行MSS大小的TCP分段;
以太网帧的payload大于MTU进行IP分片。
对于Linux,发送缓冲区的默认值为:16384
。可使用下面命令查看:
1 | 接收 |
数据来自百度云的云服务器:
对于MacOS,可参考:sysctl net.inet.tcp
,但是好像没找到与linux类似的参数。
Netty中主要是在收到数据后,对数据进行处理解码处理时,根据不同的策略,进行了拆包操作,然后将得到的完整的业务数据包传递给下个处理逻辑。分割前后的逻辑主要在ByteToMessageDecoder
这个类中。它的继承如下:
每次从TCP缓冲区读到数据都会调用其channelRead()
方法。这个函数的处理逻辑是:
cumulator
将新读入的数据(ByteBuf
)存储到cumulation
中;存在两个累加器,MERGE_CUMULATOR
和COMPOSITE_CUMULATOR
。默认的是前者,即:private Cumulator cumulator = MERGE_CUMULATOR;
。
MERGE_CUMULATOR
会先判断是否需要扩容,然后再将收到的msg拷贝到cumulation
中。
1 | /** |
扩容的过程是先得到一个能够容纳下原数据+当前数据的收集器,然后将原数据和当前数据依次拷贝进入收集器,最后释放旧的收集器里面的数据。
1 | private static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { |
COMPOSITE_CUMULATOR
是将每个新收到的消息,作为一个Component
存储到收集器CompositeByteBuf
中的components
数组中。
1 | /** |
callDecode()
方法中的decodeRemovalReentryProtection()
将调用decode()
方法,其中decode()
是一个抽象方法,由子类去实现。主要的子类有:
FixedLengthFrameDecoder
里面有一个属性叫frameLength
,用来表示消息的长度。
1 | A decoder that splits the received ByteBufs by the fixed number of bytes. For example, if you received the following four fragmented packets: |
流程也比较简单,收集器里面的数据长度够frameLength
,就从收集器中截取frameLength
byte,然后返回一个新的ByteBuf
。
1 |
|
有一个问题,如果一次收到的数据长度为2 * frameLength
,且这个数据是最后一个数据,那么是否存在解码出现异常的情况?
有一个循环
输入结束的时候再次调用解码
LineBasedFrameDecoder
流程是先找到当前消息中的换行符,存在且没有超过最大长度,返回解释到的数据。
DelimiterBasedFrameDecoder
根据特定的字符进行分割,其中如果分割符是行标志,会调用LineBasedFrameDecoder
进行分割解码。
1 | // decode()方法中 |
判断分割符是否为行分割符的代码如下:
1 | private static boolean isLineBased(final ByteBuf[] delimiters) { |
因为分割字符可能是多个,当数据中存在多个分割字符的情况下,会用分割后得到的数据最短的那个分割字符。如下:
1 | // Try all delimiters and choose the delimiter which yields the shortest frame. |
For example, if you have the following data in the buffer:
+————–+
| ABC\nDEF\r\n |
+————–+
a DelimiterBasedFrameDecoder(Delimiters.lineDelimiter()) will choose ‘\n’ as the first delimiter and produce two frames:
+—–+—–+
| ABC | DEF |
+—–+—–+
rather than incorrectly choosing ‘\r\n’ as the first delimiter:
+———-+
| ABC\nDEF |
+———-+
LengthFieldBasedFrameDecoder
简而言之,就是在数据的头部,放一个专门的长度位,根据长度位来读取后面信息的内容。
这个类比较有意思,注释差不多占了2/5。主要的处理逻辑是decode()
,但是这个方法100行都不到。注释主要解释了这个类里面几个参数的不同配置,产生不同的处理情况。
情况对应于下表:
lengthFieldOffset | lengthFieldLength | lengthAdjustment | initialBytesToStrip | |
---|---|---|---|---|
0x01 | 0 | 2 | 0 | 0 |
0x02 | 0 | 2 | 0 | 2 |
0x03 | 0 | 2 | -2 | 0 |
0x04 | 2 | 3 | 0 | 0 |
0x05 | 0 | 3 | 2 | 0 |
0x06 | 1 | 2 | 1 | 3 |
0x07 | 1 | 2 | -3 | 3 |
lengthFieldLength = 2
表示长度位占头部的2 bytes,剩下的都是消息占位,也就是0x000C(12) + 2 = 14
。
与0x01
类似,只是多了initialBytesToStrip = 2
,解码后的内容截取掉了头部的initialBytesToStrip
位。也就是解码后的长度为14 - initialBytesToStrip = 12
。
这种情况下,长度位的值,表示整个包的长度,包括长度位本身的长度。lengthAdjustment = -2
表示要将长度位的值加上lengthAdjustment
,作为消息的长度。
与0x01
相比,多了个一个长度位的偏移量lengthFieldOffset
。所以长度位的前面又可以放一些其他数据。也就是说,真正的消息是从lengthFieldOffset + lengthFieldLength
后开始。
与0x03
对比,只是lengthAdjustment
的正负不同,也就意味着真实的消息是在长度位后面是有偏移的,而偏移出来的空间,可以用作存放另外一种数据类型。
在0x04
、0x05
的基础上,长度位多了偏移lengthFieldOffset
,真实的消息的偏移又多加了一个lengthAdjustment
,然后截掉了头部开始的initialBytesToStrip
bytes。
在0x06
的基础上,lengthAdjustment
变成负数了,与0x03
的情况类似。
整体代码的流程
除去异常处理的情况,就是计算整个消息的长度,然后跳过要求跳过的字节数,再从ByteBuf
中读取消息。如下:
参考: