一种基于redis的分布式锁的实现(当前项目中在使用)

主要思想是利用redis执行命令时的单线程特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* 分布式锁
*/
@Component
public class DistributedLock {

public static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
/**
* 加锁默认超时时间
*/
private static final long DEFAULT_TIMEOUT_SECOND = 5;
/**
* 获取所等待的时间
*/
private static final long DEFAULT_WAITE_TIMEOUT_SECOND = 5;
/**
* 加锁循环等待时间
*/
private static final long LOOP_WAIT_TIME_MILLISECOND = 30;
@Autowired
@Qualifier("redisCacheService")
private BaseCacheService cacheService;

/**
* 有超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @param waiteTimeoutSecond 若果为null,使用默认超时时间
* @return 加锁的值(超时时间:-1表示获取失败,超时)
*/
public long lock(String key, Long timeoutSecond, Long waiteTimeoutSecond) {

logger.info("Thread:" + Thread.currentThread().getName() + " start lock");
long beganTime = System.currentTimeMillis() / 1000;
//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
if (waiteTimeoutSecond != null && waiteTimeoutSecond <= 0) {
waiteTimeoutSecond = DEFAULT_WAITE_TIMEOUT_SECOND;
}
waiteTimeoutSecond =
waiteTimeoutSecond == null ? DEFAULT_WAITE_TIMEOUT_SECOND : waiteTimeoutSecond;
while (true) {
//等待超时判断
long endTime = System.currentTimeMillis() / 1000;
if ((endTime - beganTime) >= waiteTimeoutSecond) {
return -1l;
}
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

//如果设置成功
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}

//如果已经超时
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

//设置新的超时时间
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}

//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}

/**
* 无超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @return 加锁的值(超时时间)
*/
public long lock(String key, Long timeoutSecond) {

logger.info("Thread:" + Thread.currentThread().getName() + " start lock");

//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
while (true) {
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;

//如果设置成功
// 若在redis中、没有相应的key值,那么可以认为,当前线程即或得该锁。
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}

//如果已经超时
// 此时该key在redis已存在,获取该key的值
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {

//设置新的超时时间
// 如果此时获取到的oldValue,与前面获取的value相同,则说明该线程可以获得锁
// 获得锁后,set进新值。
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);

//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}

//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}

/**
* 释放锁
*/
public void unLock(String key, long lockValue) {

logger.info("Thread:" + Thread.currentThread().getName() + " start unlock");
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.equals(lockValue)) {//如果是本线程加锁
cacheService.deleteVal(key);
logger.info("Thread:" + Thread.currentThread().getName() + " unlock success");
}
}
}

一种基于redis的分布式锁的实现(当前项目中在使用)

https://eucham.me/2019/02/27/63bdd864316e.html

作者

遇寻

发布于

2019-02-27

更新于

2021-02-09

许可协议

评论