大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。
EchoServer
Netty官方Echo例子,其实在源码中也有该例子。
EchoServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private int port;
public EchoServer(int port) { this.port = port; }
public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); try { ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
public static void main(String[] args) { new EchoServer(8888).run(); System.out.println("运行完毕"); } }
|
EchoServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil;
import java.util.Arrays; import java.util.List;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = ((ByteBuf) msg); System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(msg); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
EchoClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class EchoClient {
private static int port = 8888;
public static void main(String[] args) throws InterruptedException { EventLoopGroup workGroup = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect("localhost", port).sync(); f.channel().closeFuture().sync(); } finally { workGroup.shutdownGracefully(); } } }
|
EchoClientHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8)); System.out.println("通道已打开"); }
@Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
TimeServer
这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。
TimeServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class TimeServer {
public void bind(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException var9) { var9.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException var3) { System.out.println("输入参数有误"); } } new TimeServer().bind(port); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER"; currentTime = currentTime + System.getProperty("line.separator"); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.writeAndFlush(resp); }
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
TimeClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder;
public class TimeClient {
public void connect(String host, int port) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()}); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException var3) { System.out.println("输入参数有误"); } }
new TimeClient().connect("localhost", port); } }
|
TimeClientHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
@Slf4j public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private byte[] req;
public TimeClientHandler() { req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); }
public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg; for (int i = 0; i < 100; i++) { msg = Unpooled.buffer(req.length); msg.writeBytes(req); ctx.writeAndFlush(msg); } }
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("Now is : " + body + "; the counter is : " + ++ counter); }
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("Unexpected exception from downstream : " + cause.getMessage()); ctx.close(); } }
|
这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoder
和StringDecoder
不加入到pipeline中即可。
参考:
https://netty.io/wiki/index.html
https://netty.io/wiki/user-guide-for-4.x.html