两个Netty入门用法Demo

大概一年多前,用过Netty做局域网内自动组网,但是当时的主要代码不是我写的,并且时间过了很久,忘得差不多了,然而发现Netty确实是一个很有意思的框架,值得去深入研究、学习。本文的例子,之前也看过、写过,在各种介绍Netty的书籍中都有看到,并且Netty的官方文档也有这样的例子。

EchoServer

Netty官方Echo例子,其实在源码中也有该例子。

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

private int port;

public EchoServer(int port) {
this.port = port;
}

public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new EchoServer(8888).run();
System.out.println("运行完毕");
}
}

EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.Arrays;
import java.util.List;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = ((ByteBuf) msg);
System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EchoClient {

private static int port = 8888;

public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect("localhost", port).sync();
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}

EchoClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world", CharsetUtil.UTF_8));
System.out.println("通道已打开");
}

// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf m = (ByteBuf) msg; // (1)
// try {
// System.out.println("Client received : " + ((ByteBuf) msg).toString(CharsetUtil.UTF_8));
// } finally {
// m.release();
// }
// }

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

TimeServer

这个例子主要来自《Netty权威指南》,包括后面的粘包和拆包的例子都是基于此demo。

TimeServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TimeServer {

public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException var9) {
var9.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}
new TimeServer().bind(port);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

private int counter;

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? (new Date()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

TimeClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeClient {

public void connect(String host, int port) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelHandler[]{new TimeClientHandler()});
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException var3) {
System.out.println("输入参数有误");
}
}

new TimeClient().connect("localhost", port);
}
}

TimeClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;

@Slf4j
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

private int counter;

private byte[] req;

public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg;
for (int i = 0; i < 100; i++) {
msg = Unpooled.buffer(req.length);
msg.writeBytes(req);
ctx.writeAndFlush(msg);
}
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : " + body + "; the counter is : " + ++ counter);
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}

这里的代码是已经处理好了粘包/拆包问题,如果要看粘包/拆包的现象,只需要将LineBasedFrameDecoderStringDecoder不加入到pipeline中即可。

参考:
https://netty.io/wiki/index.html
https://netty.io/wiki/user-guide-for-4.x.html

使用阿里云的RocketMQ中遇到的若干问题

Python的TCP订阅模式官方文档上面RocketMQ的Python版本只支持HTTP的GroupID,链接为:https://github.com/aliyunmq/mq-http-python-sdk。HTTP版本的SDK看起来是以轮询的形式来获取数据的,并且在控制台上也看不到使用HTTP版本
阅读更多

一种基于redis的分布式锁的实现(当前项目中在使用)

主要思想是利用redis执行命令时的单线程特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* 分布式锁
*/
@Component
public class DistributedLock {

public static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
/**
* 加锁默认超时时间
*/
private static final long DEFAULT_TIMEOUT_SECOND = 5;
/**
* 获取所等待的时间
*/
private static final long DEFAULT_WAITE_TIMEOUT_SECOND = 5;
/**
* 加锁循环等待时间
*/
private static final long LOOP_WAIT_TIME_MILLISECOND = 30;
@Autowired
@Qualifier("redisCacheService")
private BaseCacheService cacheService;

/**
* 有超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @param waiteTimeoutSecond 若果为null,使用默认超时时间
* @return 加锁的值(超时时间:-1表示获取失败,超时)
*/
public long lock(String key, Long timeoutSecond, Long waiteTimeoutSecond) {

logger.info("Thread:" + Thread.currentThread().getName() + " start lock");
long beganTime = System.currentTimeMillis() / 1000;
//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
if (waiteTimeoutSecond != null && waiteTimeoutSecond <= 0) {
waiteTimeoutSecond = DEFAULT_WAITE_TIMEOUT_SECOND;
}
waiteTimeoutSecond =
waiteTimeoutSecond == null ? DEFAULT_WAITE_TIMEOUT_SECOND : waiteTimeoutSecond;
while (true) {
//等待超时判断
long endTime = System.currentTimeMillis() / 1000;
if ((endTime - beganTime) >= waiteTimeoutSecond) {
return -1l;
}
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

//如果设置成功
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}

//如果已经超时
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

//设置新的超时时间
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}

//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}

/**
* 无超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @return 加锁的值(超时时间)
*/
public long lock(String key, Long timeoutSecond) {

logger.info("Thread:" + Thread.currentThread().getName() + " start lock");

//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
while (true) {
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

//如果设置成功
// 若在redis中、没有相应的key值,那么可以认为,当前线程即或得该锁。
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}

//如果已经超时
// 此时该key在redis已存在,获取该key的值
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

//设置新的超时时间
// 如果此时获取到的oldValue,与前面获取的value相同,则说明该线程可以获得锁
// 获得锁后,set进新值。
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}

//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}

/**
* 释放锁
*/
public void unLock(String key, long lockValue) {

logger.info("Thread:" + Thread.currentThread().getName() + " start unlock");
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.equals(lockValue)) {//如果是本线程加锁
cacheService.deleteVal(key);
logger.info("Thread:" + Thread.currentThread().getName() + " unlock success");
}
}
}

nginx中的root与alias的差别

格式

nginx指定文件路径有两种方式root和alias,指令的使用方法和作用域:
[root]
语法:root path
默认值:root html
配置段:http、server、location、if
[alias]
语法:alias path
配置段:location

root与alias主要区别

在于nginx如何解释location后面的uri,这会使两者分别以不同的方式将请求映射到服务器文件上。
root的处理结果是:root路径 + location路径
alias的处理结果是:使用alias路径替换location路径
alias是一个目录别名的定义,root则是最上层目录的定义。
还有一个重要的区别是alias后面必须要用“/”结束,否则会找不到文件的,而root则可有可无。

例:

1
2
3
4
5
6
7
8
9
10
# 如果一个请求的URI是/t/a.html时,web服务器将会返回服务器上的/www/root/html/t/a.html的文件。
location ^~ /t/ {
root /www/root/html/;
}

# 如果一个请求的URI是/t/a.html时,web服务器将会返回服务器上的/www/root/html/new_t/a.html的文件。
# 注意这里是new_t,因为alias会把location后面配置的路径丢弃掉,把当前匹配到的目录指向到指定的目录。
location ^~ /t/ {
alias /www/root/html/new_t/;
}

注意

  1. 使用alias时,目录名后面一定要加”/“。
  2. alias在使用正则匹配时,必须捕捉要匹配的内容并在指定的内容处使用。
  3. alias只能位于location块中。(root可以不放在location中)

搬运工:
文章为: nginx.cn原创,转载请注明本文地址: http://www.nginx.cn/4658.html

CentOS7中Redis的安装与基本配置

安装

1
2
3
sudo yum install redis # 看yum源中是否有redis,我试的没有。不过应该是没有的,要从源码编译安装
sudo yum install epel-release # 下载fedora的epel库
sudo yum install redis # 再次安装

启动、关闭、连接

1
2
3
4
5
6
7
8
9
10
11
12
# 启动redis
service redis start
# 停止redis
service redis stop
# 查看redis运行状态
service redis status
# 查看redis进程
ps -ef | grep redis
# 进入本机redis
redis-cli
# 列出所有key
keys *

配置开机启动

1
2
# 设置redis为开机自动启动
chkconfig redis on

从防火墙开放端口

1
2
sudo firewall-cmd --zone=public --add-port=6379/tcp --permanent # 开放6379端口
sudo firewall-cmd --reload # 重新加载配置

开启远程访问

默认是只能在本机访问、所以通过网络访问需要额外的配置。配置文件的目录在:/etc/redis.conf。然后分别修改下面两处配置:

这里写图片描述

这里写图片描述

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
import redis.clients.jedis.Jedis;

public class TestRedis {
public static void main(String[] args) {
Jedis redis = new Jedis("192.168.102.6");
System.out.println(redis.ping());
}
}
/* 输出:
* PONG
*
* Process finished with exit code 0
* /

参考:
https://www.cnblogs.com/rslai/p/8249812.html
https://blog.csdn.net/xujian_2001/article/details/78927706

nginx实现Tomcat的负载均衡集群

在Linux中安装时需要自行下载源代码、安装依赖,然后编译、安装。在macOS中,有一个简便的方式,那就是使用homebrew。在Linux中安装以及配置nginx的连接为:https://blog.csdn.net/asahinokawa/article/details/82288567。虽然用了
阅读更多

反向代理为什么叫反向代理

Nginx可以实现端口转发,这又叫反向代理。那么什么叫反向代理呢?一开始特别不理解,然后去找一些答案时,发现一个特别形象和容易懂的回答。来自知乎中对此问题的一个回答:https://www.zhihu.com/question/24723688

这里写图片描述

这里写图片描述

正向代理隐藏真实客户端,反向代理隐藏真实服务端

自己的理解:
正向代理就是我们平常使用的那种代理软件的效果,我们将自己的请求发给代理服务器,然后再由代理服务器发送给目标服务器;反向代理,顾名思义,方向相反,当代理服务器收到请求后,依照某种规则,转发给不同的服务器。因此,结合上面两幅生动形象、网友所给的图,可以非常容易理解为什么叫反向代理。

nginx的安装与配置、使用

安装依赖库sudo yum -y install gcc pcre zlib zlib-devel openssl openssl-devel下载&安装下载页面:http://nginx.org/en/download.html此处的下载版本为:http://nginx.org/downlo
阅读更多

基于Netty实现局域网内自动组网

这种功能的实现首先考虑到的是广/多播,然后通过所受到的广播,获取到发送某种广播的ip地址,即实现“发现设备”功能。得到IP,即完成组网功能。多播与广播在这里选择的是多播。选项|单播|多播(组播)|广播:|:|:|:描述|主机之间一对一的通讯模式,网络中的交换机和路由器对数据只进行转发不进行复制。|主
阅读更多