一种基于redis的分布式锁的实现(当前项目中在使用)
主要思想是利用redis执行命令时的单线程特性。
/**
* 分布式锁
*/
@Component
public class DistributedLock {
public static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
/**
* 加锁默认超时时间
*/
private static final long DEFAULT_TIMEOUT_SECOND = 5;
/**
* 获取所等待的时间
*/
private static final long DEFAULT_WAITE_TIMEOUT_SECOND = 5;
/**
* 加锁循环等待时间
*/
private static final long LOOP_WAIT_TIME_MILLISECOND = 30;
@Autowired
@Qualifier("redisCacheService")
private BaseCacheService cacheService;
/**
* 有超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @param waiteTimeoutSecond 若果为null,使用默认超时时间
* @return 加锁的值(超时时间:-1表示获取失败,超时)
*/
public long lock(String key, Long timeoutSecond, Long waiteTimeoutSecond) {
logger.info("Thread:" + Thread.currentThread().getName() + " start lock");
long beganTime = System.currentTimeMillis() / 1000;
//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
if (waiteTimeoutSecond != null && waiteTimeoutSecond <= 0) {
waiteTimeoutSecond = DEFAULT_WAITE_TIMEOUT_SECOND;
}
waiteTimeoutSecond =
waiteTimeoutSecond == null ? DEFAULT_WAITE_TIMEOUT_SECOND : waiteTimeoutSecond;
while (true) {
//等待超时判断
long endTime = System.currentTimeMillis() / 1000;
if ((endTime - beganTime) >= waiteTimeoutSecond) {
return -1l;
}
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;
//如果设置成功
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
//如果已经超时
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {
//设置新的超时时间
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);
//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}
//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}
/**
* 无超时等待的加锁
*
* @param timeoutSecond 如果为null,使用默认超时时间
* @return 加锁的值(超时时间)
*/
public long lock(String key, Long timeoutSecond) {
logger.info("Thread:" + Thread.currentThread().getName() + " start lock");
//如果参数错误
if (timeoutSecond != null && timeoutSecond <= 0) {
timeoutSecond = DEFAULT_TIMEOUT_SECOND;
}
timeoutSecond = timeoutSecond == null ? DEFAULT_TIMEOUT_SECOND : timeoutSecond;
while (true) {
//超时时间点
long timeoutTimeMilli = cacheService.getCurrentTimeMilliForCache() + timeoutSecond * 1000;
//如果设置成功
// 若在redis中、没有相应的key值,那么可以认为,当前线程即或得该锁。
if (cacheService.setIfAbsent(key, timeoutTimeMilli)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
//如果已经超时
// 此时该key在redis已存在,获取该key的值
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.longValue() < cacheService.getCurrentTimeMilliForCache()) {
//设置新的超时时间
// 如果此时获取到的oldValue,与前面获取的value相同,则说明该线程可以获得锁
// 获得锁后,set进新值。
Long oldValue = cacheService.getAndSet(key, timeoutTimeMilli);
//多个线程同时getset,只有第一个才可以获取到锁
if (value.equals(oldValue)) {
logger.info("Thread:" + Thread.currentThread().getName() + " lock success");
return timeoutTimeMilli;
}
}
//延迟一定毫秒,防止请求太频繁
try {
Thread.sleep(LOOP_WAIT_TIME_MILLISECOND);
} catch (InterruptedException e) {
logger.error("DistributedLock lock sleep error", e);
}
}
}
/**
* 释放锁
*/
public void unLock(String key, long lockValue) {
logger.info("Thread:" + Thread.currentThread().getName() + " start unlock");
Long value = cacheService.getVal(key, Long.class);
if (value != null && value.equals(lockValue)) {//如果是本线程加锁
cacheService.deleteVal(key);
logger.info("Thread:" + Thread.currentThread().getName() + " unlock success");
}
}
}