两个Netty入门用法Demo
大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。
EchoServer
Netty官方Echo例子,其实在源码中也有该例子。
EchoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new EchoServer(8888).run();
System.out.println("运行完毕");
}
}
EchoServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.util.Arrays;
import java.util.List;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = ((ByteBuf) msg);
System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
EchoClient
public class EchoClient {
private static int port = 8888;
public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect("localhost", port).sync();
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
EchoClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8));
System.out.println("通道已打开");
}
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf m = (ByteBuf) msg; // (1)
// try {
// System.out.println("Client received : " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
// } finally {
// m.release();
// }
// }
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TimeServer
这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。
TimeServer
public class TimeServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException var9) {
var9.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}
new TimeServer().bind(port);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
TimeClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class TimeClient {
public void connect(String host, int port) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()});
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}
new TimeClient().connect("localhost", port);
}
}
TimeClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private byte[] req;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(req.length);
msg.writeBytes(req);
ctx.writeAndFlush(msg);
}
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : " + body + "; the counter is : " + ++ counter);
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoder和StringDecoder不加入到pipeline中即可。
参考: https://netty.io/wiki/index.htmlhttps://netty.io/wiki/user-guide-for-4.x.html