两个Netty入门用法Demo

大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。

EchoServer

Netty官方Echo例子,其实在源码中也有该例子。

EchoServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new EchoServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new EchoServer(8888).run();
        System.out.println("运行完毕");
    }
}

EchoServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.Arrays;
import java.util.List;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = ((ByteBuf) msg);
        System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoClient

public class EchoClient {

    private static int port = 8888;

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup workGroup  = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect("localhost", port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}

EchoClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8));
        System.out.println("通道已打开");
    }

//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        ByteBuf m = (ByteBuf) msg; // (1)
//        try {
//            System.out.println("Client received : " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
//        } finally {
//            m.release();
//        }
//    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeServer

这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。

TimeServer

public class TimeServer {

    public void bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException var9) {
            var9.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException var3) {
                System.out.println("输入参数有误");
            }
        }
        new TimeServer().bind(port);
    }
}

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

TimeClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {

    public void connect(String host, int port) throws Exception {
        NioEventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()});
                        }
                    });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException var3) {
                System.out.println("输入参数有误");
            }
        }

        new TimeClient().connect("localhost", port);
    }
}

TimeClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg;
        for (int i = 0; i < 100; i++) {
            msg = Unpooled.buffer(req.length);
            msg.writeBytes(req);
            ctx.writeAndFlush(msg);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Now is : " + body + "; the counter is : " + ++ counter);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoderStringDecoder不加入到pipeline中即可。

参考: https://netty.io/wiki/index.htmlhttps://netty.io/wiki/user-guide-for-4.x.html

Read more

Volcano 与 Kubernetes GPU 调度学习笔记

本笔记系统整理 Volcano 调度器、Kubernetes 调度框架、GPU Device Plugin、HAMi 等云原生 AI 调度领域的核心知识,适合用于学习、复习和工程实践参考。 目录 * 第一部分:Volcano 入门 * 1. Volcano 是什么 * 2. 安装与快速使用 * 3. 核心特性一览 * 第二部分:Volcano 整体架构 * 4. Volcano 解决的核心问题 * 5. 整体架构与数据流 * 6. 三层抽象模型 * 第三部分:Volcano 核心实现原理 * 7. Session 机制 * 8. Gang Scheduling 实现 * 9. Queue 与 DRF 公平调度

容器镜像(4):镜像的常用工具箱

容器镜像(4):镜像的常用工具箱

前几篇在讲多架构镜像时已经用过 skopeo 和 crane 做镜像复制,这篇系统整理这两个工具的完整能力,同时介绍几个日常操作镜像时同样好用的工具。 一、skopeo:不依赖 Daemon 的镜像瑞士军刀 skopeo 的核心价值是绕过 Docker daemon,直接与 Registry API 交互。上一篇用它做镜像复制和离线传输,但它的能力远不止于此。 1.1 安装 # Ubuntu / Debian sudo apt install -y skopeo skopeo --version # skopeo version 1.15.1 1.2 inspect:免拉取检查镜像元数据 docker inspect 需要先把镜像拉到本地,skopeo inspect 直接向 Registry

容器镜像(3):多架构镜像构建

容器镜像(3):多架构镜像构建

一、什么是多架构镜像 1.1 OCI Image Index 上一篇介绍了单平台镜像的结构:一个 Manifest 指向 Config 和若干 Layer blob。多架构镜像在此之上多了一层——OCI Image Index(也叫 Manifest List),是一个轻量的索引文件,把多个单平台 Manifest 组织在一起: $ docker manifest inspect golang:1.22-alpine { "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests&

容器镜像(2):containerd 视角下的镜像

容器镜像(2):containerd 视角下的镜像

一、为什么需要了解 containerd 如果你只用 docker run 跑容器,从来不关心底层,那可以不了解 containerd。但如果你在用 Kubernetes,或者想真正理解"容器运行时"是什么,containerd 是绕不开的。 事实上,当你执行 docker run 的时候,containerd 早就在后台悄悄工作了——Docker 从 1.11 版本开始,就把核心运行时剥离出来交给 containerd 负责。 1.1 Docker 的架构演变 早期的 Docker(1.10 及之前)是一个"大一统"的单体程序:一个 dockerd