两个Netty入门用法Demo

大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。

EchoServer

Netty官方Echo例子,其实在源码中也有该例子。

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

private int port;

public EchoServer(int port) {
this.port = port;
}

public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new EchoServer(8888).run();
System.out.println("运行完毕");
}
}

EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.Arrays;
import java.util.List;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = ((ByteBuf) msg);
System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EchoClient {

private static int port = 8888;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect("localhost", port).sync();
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}

EchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8));
System.out.println("通道已打开");
}

// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf m = (ByteBuf) msg; // (1)
// try {
// System.out.println("Client received : " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
// } finally {
// m.release();
// }
// }

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

TimeServer

这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。

TimeServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TimeServer {

public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException var9) {
var9.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}
new TimeServer().bind(port);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

private int counter;

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

TimeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {

public void connect(String host, int port) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()});
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}

new TimeClient().connect("localhost", port);
}
}

TimeClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

private int counter;

private byte[] req;

public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(req.length);
msg.writeBytes(req);
ctx.writeAndFlush(msg);
}
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : " + body + "; the counter is : " + ++ counter);
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}

这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoderStringDecoder不加入到pipeline中即可。

参考:
https://netty.io/wiki/index.html
https://netty.io/wiki/user-guide-for-4.x.html

作者

遇寻

发布于

2019-08-27

更新于

2021-02-09

许可协议

评论